diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java new file mode 100644 index 0000000000..543051a500 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactory.java @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.opensearch.client.opensearch._types.VersionType; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.CreateOperation; +import org.opensearch.client.opensearch.core.bulk.DeleteOperation; +import org.opensearch.client.opensearch.core.bulk.IndexOperation; +import org.opensearch.client.opensearch.core.bulk.UpdateOperation; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; + +import java.io.IOException; +import java.util.Optional; + +public class BulkOperationFactory { + + private final VersionType versionType; + private final ScriptManager scriptManager; + private final ObjectMapper objectMapper; + private final boolean usingDocumentFilters; + + public BulkOperationFactory(final VersionType versionType, + final ScriptManager scriptManager, + final ObjectMapper objectMapper, + final boolean usingDocumentFilters) { + this.versionType = versionType; + this.scriptManager = scriptManager; + this.objectMapper = objectMapper; + this.usingDocumentFilters = usingDocumentFilters; + } + + public BulkOperation create(final String action, + final SerializedJson document, + final Long version, + final String indexName, + final JsonNode jsonNode) { + final Optional docId = document.getDocumentId(); + final Optional routing = document.getRoutingField(); + final Optional pipeline = document.getPipelineField(); + + if (StringUtils.equals(action, OpenSearchBulkActions.CREATE.toString())) { + final CreateOperation.Builder builder = + new CreateOperation.Builder<>() + .index(indexName) + .document(document); + docId.ifPresent(builder::id); + routing.ifPresent(builder::routing); + pipeline.ifPresent(builder::pipeline); + return new BulkOperation.Builder().create(builder.build()).build(); + } + + if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) || + StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) { + return createUpdateOperation(action, document, version, indexName, jsonNode, docId, routing); + } + + if (StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) { + final DeleteOperation.Builder builder = new DeleteOperation.Builder() + .index(indexName) + .versionType(versionType) + .version(version); + docId.ifPresent(builder::id); + routing.ifPresent(builder::routing); + return new BulkOperation.Builder().delete(builder.build()).build(); + } + + // Default to "index" + final IndexOperation.Builder builder = new IndexOperation.Builder<>() + .index(indexName) + .document(document) + .version(version) + .versionType(versionType); + docId.ifPresent(builder::id); + routing.ifPresent(builder::routing); + pipeline.ifPresent(builder::pipeline); + return new BulkOperation.Builder().index(builder.build()).build(); + } + + private BulkOperation createUpdateOperation(final String action, + final SerializedJson document, + final Long version, + final String indexName, + final JsonNode jsonNode, + final Optional docId, + final Optional routing) { + JsonNode filteredJsonNode = jsonNode; + try { + if (usingDocumentFilters) { + filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson()); + } + } catch (final IOException e) { + throw new RuntimeException( + String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage())); + } + + final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString()); + final UpdateOperation.Builder builder = new UpdateOperation.Builder<>() + .index(indexName) + .versionType(versionType) + .version(version); + + if (scriptManager.isScriptEnabled()) { + builder.script(scriptManager.buildScript(filteredJsonNode, document.getResolvedScriptParameters().orElse(null))); + if (isUpsert) { + builder.upsert(filteredJsonNode); + builder.scriptedUpsert(true); + } + } else if (isUpsert) { + builder.document(filteredJsonNode).upsert(filteredJsonNode); + } else { + builder.document(filteredJsonNode); + } + + docId.ifPresent(builder::id); + routing.ifPresent(builder::routing); + return new BulkOperation.Builder().update(builder.build()).build(); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index ace23d3260..8cdce533c9 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -10,7 +10,6 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; import com.google.common.annotations.VisibleForTesting; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; @@ -21,10 +20,6 @@ import org.opensearch.client.opensearch._types.VersionType; import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.bulk.BulkOperation; -import org.opensearch.client.opensearch.core.bulk.CreateOperation; -import org.opensearch.client.opensearch.core.bulk.DeleteOperation; -import org.opensearch.client.opensearch.core.bulk.IndexOperation; -import org.opensearch.client.opensearch.core.bulk.UpdateOperation; import org.opensearch.client.transport.TransportOptions; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; @@ -140,6 +135,7 @@ public class OpenSearchSink extends AbstractSink> { private final String action; private final List actions; private final ScriptManager scriptManager; + private final BulkOperationFactory bulkOperationFactory; private final String documentRootKey; private String configuredIndexAlias; private final ReentrantLock lock; @@ -221,6 +217,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.lastFlushTimeMap = new ConcurrentHashMap<>(); this.pluginConfigObservable = pluginConfigObservable; this.objectMapper = new ObjectMapper(); + this.bulkOperationFactory = new BulkOperationFactory(versionType, scriptManager, objectMapper, isUsingDocumentFilters()); this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ? Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null; @@ -344,96 +341,6 @@ public boolean isReady() { return initialized; } - BulkOperation getBulkOperationForAction(final String action, - final SerializedJson document, - final Long version, - final String indexName, - final JsonNode jsonNode) { - BulkOperation bulkOperation; - final Optional docId = document.getDocumentId(); - final Optional routing = document.getRoutingField(); - final Optional pipeline = document.getPipelineField(); - - if (StringUtils.equals(action, OpenSearchBulkActions.CREATE.toString())) { - final CreateOperation.Builder createOperationBuilder = - new CreateOperation.Builder<>() - .index(indexName) - .document(document); - docId.ifPresent(createOperationBuilder::id); - routing.ifPresent(createOperationBuilder::routing); - pipeline.ifPresent(createOperationBuilder::pipeline); - - bulkOperation = new BulkOperation.Builder() - .create(createOperationBuilder.build()) - .build(); - return bulkOperation; - } - if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) || - StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString())) { - - JsonNode filteredJsonNode = jsonNode; - try { - if (isUsingDocumentFilters()) { - filteredJsonNode = objectMapper.reader().readTree(document.getSerializedJson()); - } - } catch (final IOException e) { - throw new RuntimeException( - String.format("An exception occurred while deserializing a document for the %s action: %s", action, e.getMessage())); - } - - - final boolean isUpsert = StringUtils.equals(action.toLowerCase(), OpenSearchBulkActions.UPSERT.toString()); - final UpdateOperation.Builder updateOperationBuilder = new UpdateOperation.Builder<>() - .index(indexName) - .versionType(versionType) - .version(version); - - if (scriptManager.isScriptEnabled()) { - updateOperationBuilder.script(scriptManager.buildScript(jsonNode, document.getResolvedScriptParameters().orElse(null))); - updateOperationBuilder.upsert(filteredJsonNode); - updateOperationBuilder.scriptedUpsert(true); - } else if (isUpsert) { - updateOperationBuilder.document(filteredJsonNode).upsert(filteredJsonNode); - } else { - updateOperationBuilder.document(filteredJsonNode); - } - - docId.ifPresent(updateOperationBuilder::id); - routing.ifPresent(updateOperationBuilder::routing); - bulkOperation = new BulkOperation.Builder() - .update(updateOperationBuilder.build()) - .build(); - return bulkOperation; - } - if (StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) { - final DeleteOperation.Builder deleteOperationBuilder = - new DeleteOperation.Builder().index(indexName); - docId.ifPresent(deleteOperationBuilder::id); - routing.ifPresent(deleteOperationBuilder::routing); - bulkOperation = new BulkOperation.Builder() - .delete(deleteOperationBuilder - .versionType(versionType) - .version(version) - .build()) - .build(); - return bulkOperation; - } - // Default to "index" - final IndexOperation.Builder indexOperationBuilder = - new IndexOperation.Builder<>() - .index(indexName) - .document(document) - .version(version) - .versionType(versionType); - docId.ifPresent(indexOperationBuilder::id); - routing.ifPresent(indexOperationBuilder::routing); - pipeline.ifPresent(indexOperationBuilder::pipeline); - bulkOperation = new BulkOperation.Builder() - .index(indexOperationBuilder.build()) - .build(); - return bulkOperation; - } - @Override public void doOutput(final Collection> records) { final long threadId = Thread.currentThread().getId(); @@ -541,7 +448,7 @@ public void doOutput(final Collection> records) { BulkOperation bulkOperation; try { - bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode()); + bulkOperation = bulkOperationFactory.create(eventAction, document, version, indexName, event.getJsonNode()); } catch (final Exception e) { LOG.error("An exception occurred while constructing the bulk operation for a document: ", e); logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactoryTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactoryTest.java new file mode 100644 index 0000000000..789355b0c1 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationFactoryTest.java @@ -0,0 +1,161 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.client.opensearch._types.VersionType; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class BulkOperationFactoryTest { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private BulkOperationFactory factory; + + @BeforeEach + void setUp() { + final ScriptManager scriptManager = new ScriptManager(null, null); + factory = new BulkOperationFactory(VersionType.External, scriptManager, objectMapper, false); + } + + @Test + void create_action_returns_create_operation() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"key\":\"val\"}", "doc-1", "route-1", null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.CREATE.toString(), document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isCreate(), is(true)); + assertThat(result.create().index(), equalTo("test-index")); + assertThat(result.create().id(), equalTo("doc-1")); + assertThat(result.create().routing(), equalTo("route-1")); + } + + @Test + void index_action_returns_index_operation() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"key\":\"val\"}", "doc-1", null, null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.INDEX.toString(), document, 1L, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isIndex(), is(true)); + assertThat(result.index().index(), equalTo("test-index")); + assertThat(result.index().id(), equalTo("doc-1")); + assertThat(result.index().version(), equalTo(1L)); + assertThat(result.index().versionType(), equalTo(VersionType.External)); + } + + @Test + void delete_action_returns_delete_operation() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", "route-1", null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.DELETE.toString(), document, 2L, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isDelete(), is(true)); + assertThat(result.delete().index(), equalTo("test-index")); + assertThat(result.delete().id(), equalTo("doc-1")); + assertThat(result.delete().routing(), equalTo("route-1")); + assertThat(result.delete().version(), equalTo(2L)); + } + + @Test + void update_action_returns_update_operation() { + final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); + + assertThat(result.isUpdate(), is(true)); + assertThat(result.update().index(), equalTo("test-index")); + assertThat(result.update().id(), equalTo("doc-1")); + } + + @Test + void upsert_action_returns_update_operation() { + final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); + + assertThat(result.isUpdate(), is(true)); + assertThat(result.update().index(), equalTo("test-index")); + assertThat(result.update().id(), equalTo("doc-1")); + } + + @Test + void unknown_action_defaults_to_index_operation() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); + + final BulkOperation result = factory.create( + "unknown_action", document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isIndex(), is(true)); + assertThat(result.index().index(), equalTo("test-index")); + } + + @Test + void create_action_without_optional_fields() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", null, null, null); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.CREATE.toString(), document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isCreate(), is(true)); + assertThat(result.create().index(), equalTo("test-index")); + } + + @Test + void update_with_document_filters_deserializes_from_serialized_json() { + final ScriptManager scriptManager = new ScriptManager(null, null); + final BulkOperationFactory filterFactory = new BulkOperationFactory(null, scriptManager, objectMapper, true); + final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "original"); + final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"filtered\"}", "doc-1", null, null); + + final BulkOperation result = filterFactory.create( + OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); + + assertThat(result.isUpdate(), is(true)); + assertThat(result.update().id(), equalTo("doc-1")); + } + + @Test + void create_action_sets_pipeline() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, "my-pipeline"); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.CREATE.toString(), document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isCreate(), is(true)); + assertThat(result.create().pipeline(), equalTo("my-pipeline")); + } + + @Test + void index_action_sets_pipeline() { + final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, "my-pipeline"); + + final BulkOperation result = factory.create( + OpenSearchBulkActions.INDEX.toString(), document, null, "test-index", objectMapper.createObjectNode()); + + assertThat(result.isIndex(), is(true)); + assertThat(result.index().pipeline(), equalTo("my-pipeline")); + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java index fa41a5432d..219f1fba03 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkScriptTest.java @@ -13,156 +13,40 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.DistributionSummary; -import io.micrometer.core.instrument.Timer; import jakarta.json.stream.JsonGenerator; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.MockedConstruction; -import org.mockito.MockedStatic; -import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.UpdateOperation; -import org.opensearch.client.RestHighLevelClient; -import org.opensearch.client.json.jackson.JacksonJsonpMapper; -import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import org.opensearch.dataprepper.metrics.MetricNames; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.configuration.PipelineDescription; -import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; -import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; -import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; -import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig; import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.ScriptConfiguration; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateType; import java.io.IOException; import java.io.StringWriter; import java.util.Map; -import java.util.Optional; -import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstruction; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.EXTERNAL_LATENCY; -import static org.opensearch.dataprepper.model.sink.SinkLatencyMetrics.INTERNAL_LATENCY; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_ERRORS; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_LATENCY; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.BULKREQUEST_SIZE_BYTES; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.DYNAMIC_INDEX_DROPPED_EVENTS; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.INVALID_ACTION_ERRORS; -import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.INVALID_VERSION_EXPRESSION_DROPPED_EVENTS; -import static org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig.DEFAULT_BULK_SIZE; -import static org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig.DEFAULT_FLUSH_TIMEOUT; - -@ExtendWith(MockitoExtension.class) + public class OpenSearchSinkScriptTest { private final ObjectMapper objectMapper = new ObjectMapper(); private final JacksonJsonpMapper jsonpMapper = new JacksonJsonpMapper(); - @Mock private IndexManagerFactory indexManagerFactory; - @Mock private OpenSearchClient openSearchClient; - @Mock private SinkContext sinkContext; - @Mock private PluginSetting pluginSetting; - @Mock private ExpressionEvaluator expressionEvaluator; - @Mock private AwsCredentialsSupplier awsCredentialsSupplier; - @Mock private OpenSearchSinkConfiguration openSearchSinkConfiguration; - @Mock private PipelineDescription pipelineDescription; - @Mock private IndexConfiguration indexConfiguration; - @Mock private PluginMetrics pluginMetrics; - @Mock private OpenSearchSinkConfig openSearchSinkConfig; - @Mock private PluginConfigObservable pluginConfigObservable; - @Mock private ScriptConfiguration scriptConfiguration; - - @BeforeEach - void setup() { - when(pipelineDescription.getPipelineName()).thenReturn(UUID.randomUUID().toString()); - - final RetryConfiguration retryConfiguration = mock(RetryConfiguration.class); - when(retryConfiguration.getDlq()).thenReturn(Optional.empty()); - lenient().when(retryConfiguration.getDlqFile()).thenReturn(null); - - final ConnectionConfiguration connectionConfiguration = mock(ConnectionConfiguration.class); - final RestHighLevelClient restHighLevelClient = mock(RestHighLevelClient.class); - lenient().when(connectionConfiguration.createClient(awsCredentialsSupplier)).thenReturn(restHighLevelClient); - lenient().when(connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier)).thenReturn(openSearchClient); - - when(indexConfiguration.getDocumentId()).thenReturn(null); - when(indexConfiguration.getDocumentIdField()).thenReturn(null); - when(indexConfiguration.getRouting()).thenReturn(null); - when(indexConfiguration.getActions()).thenReturn(null); - when(indexConfiguration.getDocumentRootKey()).thenReturn(null); - lenient().when(indexConfiguration.getVersionType()).thenReturn(null); - lenient().when(indexConfiguration.getVersionExpression()).thenReturn(null); - lenient().when(indexConfiguration.getIndexAlias()).thenReturn(UUID.randomUUID().toString()); - lenient().when(indexConfiguration.getTemplateType()).thenReturn(TemplateType.V1); - when(indexConfiguration.getIndexType()).thenReturn(IndexType.CUSTOM); - when(indexConfiguration.getBulkSize()).thenReturn(DEFAULT_BULK_SIZE); - when(indexConfiguration.getFlushTimeout()).thenReturn(DEFAULT_FLUSH_TIMEOUT); - - when(openSearchSinkConfiguration.getIndexConfiguration()).thenReturn(indexConfiguration); - when(openSearchSinkConfiguration.getRetryConfiguration()).thenReturn(retryConfiguration); - lenient().when(openSearchSinkConfiguration.getConnectionConfiguration()).thenReturn(connectionConfiguration); - - when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(mock(Counter.class)); - when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(mock(Timer.class)); - when(pluginMetrics.timer(INTERNAL_LATENCY)).thenReturn(mock(Timer.class)); - when(pluginMetrics.timer(EXTERNAL_LATENCY)).thenReturn(mock(Timer.class)); - when(pluginMetrics.timer(BULKREQUEST_LATENCY)).thenReturn(mock(Timer.class)); - when(pluginMetrics.counter(BULKREQUEST_ERRORS)).thenReturn(mock(Counter.class)); - when(pluginMetrics.counter(INVALID_ACTION_ERRORS)).thenReturn(mock(Counter.class)); - when(pluginMetrics.counter(DYNAMIC_INDEX_DROPPED_EVENTS)).thenReturn(mock(Counter.class)); - when(pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS)).thenReturn(mock(Counter.class)); - when(pluginMetrics.summary(BULKREQUEST_SIZE_BYTES)).thenReturn(mock(DistributionSummary.class)); - - lenient().when(sinkContext.getTagsTargetKey()).thenReturn(null); - lenient().when(sinkContext.getIncludeKeys()).thenReturn(null); - lenient().when(sinkContext.getExcludeKeys()).thenReturn(null); + private BulkOperationFactory createFactory(final ScriptConfiguration scriptConfig) { + final ScriptManager scriptManager = new ScriptManager(scriptConfig, null); + return new BulkOperationFactory(null, scriptManager, new ObjectMapper(), false); } - private void configureScript(final String source) { - configureScript(source, null); - } - - private void configureScript(final String source, final Map params) { - when(scriptConfiguration.getSource()).thenReturn(source); - lenient().when(scriptConfiguration.getParams()).thenReturn(params); - when(indexConfiguration.getScriptConfiguration()).thenReturn(scriptConfiguration); - } - - private void configureAction(final String action) { - when(indexConfiguration.getAction()).thenReturn(action); - } - - private OpenSearchSink createObjectUnderTest() throws IOException { - try (final MockedStatic configMockedStatic = mockStatic(OpenSearchSinkConfiguration.class); - final MockedStatic metricsMockedStatic = mockStatic(PluginMetrics.class); - final MockedConstruction ignored = mockConstruction(IndexManagerFactory.class, (mock, context) -> { - indexManagerFactory = mock; - })) { - metricsMockedStatic.when(() -> PluginMetrics.fromPluginSetting(pluginSetting)).thenReturn(pluginMetrics); - configMockedStatic.when(() -> OpenSearchSinkConfiguration.readOSConfig(openSearchSinkConfig, expressionEvaluator)) - .thenReturn(openSearchSinkConfiguration); - return new OpenSearchSink( - pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription, pluginConfigObservable, openSearchSinkConfig); - } + private ScriptConfiguration mockScript(final String source, final Map params) { + final ScriptConfiguration config = mock(ScriptConfiguration.class); + lenient().when(config.getSource()).thenReturn(source); + lenient().when(config.getParams()).thenReturn(params); + return config; } private JsonNode serializeBody(final UpdateOperation updateOp) throws IOException { @@ -178,14 +62,11 @@ private JsonNode serializeBody(final UpdateOperation updateOp) throws IOExcep @Test void script_sets_script_source_and_lang() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += 1"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += 1", null)); final ObjectNode jsonNode = objectMapper.createObjectNode().put("counter", 0); final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"counter\":0}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -195,14 +76,11 @@ void script_sets_script_source_and_lang() throws IOException { @Test void script_always_passes_event_as_params_doc() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.putAll(params.doc)"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.putAll(params.doc)", null)); final ObjectNode jsonNode = objectMapper.createObjectNode().put("price", 9.99).put("currency", "USD"); final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -214,10 +92,7 @@ void script_always_passes_event_as_params_doc() throws IOException { @Test void script_merges_resolved_params_alongside_doc() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += params.increment", Map.of("increment", 5)); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += params.increment", Map.of("increment", 5))); final ObjectNode jsonNode = objectMapper.createObjectNode().put("counter", 0); final SerializedJson document = SerializedJson.builder() .withJsonString("{}") @@ -225,7 +100,7 @@ void script_merges_resolved_params_alongside_doc() throws IOException { .withResolvedScriptParameters(Map.of("increment", 5)) .build(); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -236,14 +111,11 @@ void script_merges_resolved_params_alongside_doc() throws IOException { @Test void script_always_sets_scripted_upsert_true() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += 1"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += 1", null)); final ObjectNode jsonNode = objectMapper.createObjectNode(); final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -252,14 +124,11 @@ void script_always_sets_scripted_upsert_true() throws IOException { @Test void script_sets_upsert_body() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += 1"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += 1", null)); final ObjectNode jsonNode = objectMapper.createObjectNode().put("counter", 0); final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"counter\":0}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -268,31 +137,26 @@ void script_sets_upsert_body() throws IOException { @Test void script_works_with_update_action() throws IOException { - configureAction(OpenSearchBulkActions.UPDATE.toString()); - configureScript("ctx._source.status = params.doc.status"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.status = params.doc.status", null)); final ObjectNode jsonNode = objectMapper.createObjectNode().put("status", "active"); final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); assertThat(body.get("script"), notNullValue()); - assertThat(body.get("scripted_upsert").asBoolean(), equalTo(true)); + assertThat(body.has("scripted_upsert"), equalTo(false)); + assertThat(body.has("upsert"), equalTo(false)); } @Test void script_sets_document_id_and_routing() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - configureScript("ctx._source.counter += 1"); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(mockScript("ctx._source.counter += 1", null)); final ObjectNode jsonNode = objectMapper.createObjectNode(); final SerializedJson document = SerializedJson.fromStringAndOptionals("{}", "my-doc-id", "my-route", null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); assertThat(result.update().id(), equalTo("my-doc-id")); @@ -301,14 +165,11 @@ void script_sets_document_id_and_routing() throws IOException { @Test void without_script_upsert_preserves_original_behavior() throws IOException { - configureAction(OpenSearchBulkActions.UPSERT.toString()); - when(indexConfiguration.getScriptConfiguration()).thenReturn(null); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(null); final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPSERT.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update()); @@ -319,14 +180,11 @@ void without_script_upsert_preserves_original_behavior() throws IOException { @Test void without_script_update_preserves_original_behavior() throws IOException { - configureAction(OpenSearchBulkActions.UPDATE.toString()); - when(indexConfiguration.getScriptConfiguration()).thenReturn(null); - final OpenSearchSink sink = createObjectUnderTest(); - + final BulkOperationFactory factory = createFactory(null); final ObjectNode jsonNode = objectMapper.createObjectNode().put("name", "test"); final SerializedJson document = SerializedJson.fromStringAndOptionals("{\"name\":\"test\"}", "doc-1", null, null); - final BulkOperation result = sink.getBulkOperationForAction( + final BulkOperation result = factory.create( OpenSearchBulkActions.UPDATE.toString(), document, null, "test-index", jsonNode); final JsonNode body = serializeBody(result.update());