diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 564ed55fce..67ad1caef4 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -1637,6 +1637,80 @@ public void testOpenSearchIndexWithInvalidChars() throws IOException, Interrupte Assert.assertThrows(RuntimeException.class, () -> sink.doInitialize()); } + @Test + @DisabledIf(value = "isDataStreamNotSupported", disabledReason = "Data streams require OpenSearch 1.3.0+") + public void testDataStreamDetection() throws IOException, InterruptedException { + final String dataStreamName = "test-data-stream-" + UUID.randomUUID(); + final String templateName = dataStreamName + "-template"; + final File tempDirectory = Files.createTempDirectory("").toFile(); + final String dlqFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt"; + + try { + // Create an index template for the data stream first + final Request createTemplateRequest = new Request(HttpMethod.PUT, "/_index_template/" + templateName); + final String templateBody = "{" + + "\"index_patterns\": [\"" + dataStreamName + "\"]," + + "\"data_stream\": {}," + + "\"template\": {" + + "\"mappings\": {" + + "\"properties\": {" + + "\"@timestamp\": {\"type\": \"date\"}" + + "}" + + "}" + + "}" + + "}"; + createTemplateRequest.setJsonEntity(templateBody); + client.performRequest(createTemplateRequest); + + // Create a data stream + final Request createDataStreamRequest = new Request(HttpMethod.PUT, "/_data_stream/" + dataStreamName); + client.performRequest(createDataStreamRequest); + + // Initialize sink AFTER creating the data stream so detection works + Map metadata = initializeConfigurationMetadata(null, dataStreamName, null); + metadata.put(RetryConfiguration.DLQ_FILE, dlqFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); + + // Test that the data stream is detected + final String testIdField = "someId"; + final String testId = "foo"; + final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); + + sink.output(testRecords); + sink.shutdown(); + + // Wait for indexing to complete + Thread.sleep(2000); + + // Verify the document was written to the data stream + final List> retSources = getSearchResponseDocSources(dataStreamName); + assertThat("Expected 1 document in data stream " + dataStreamName + " but found " + retSources.size(), + retSources.size(), equalTo(1)); + } catch (Exception e) { + throw e; + } finally { + // Clean up the data stream + final Request deleteDataStreamRequest = new Request(HttpMethod.DELETE, "/_data_stream/" + dataStreamName); + try { + client.performRequest(deleteDataStreamRequest); + } catch (IOException e) { + // Ignore cleanup errors + } + + // Clean up the index template + final Request deleteTemplateRequest = new Request(HttpMethod.DELETE, "/_index_template/" + templateName); + try { + client.performRequest(deleteTemplateRequest); + } catch (IOException e) { + // Ignore cleanup errors + } + + // Clean up DLQ + FileUtils.deleteQuietly(tempDirectory); + } + } + @Test @Timeout(value = 1, unit = TimeUnit.MINUTES) @DisabledIf(value = "isES6", @@ -1962,6 +2036,96 @@ private static boolean isES6() { return DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(OpenSearchIntegrationHelper.getVersion()) >= 0; } + private static boolean isDataStreamNotSupported() { + // Data streams require OpenSearch 1.3.0+ + return OpenSearchIntegrationHelper.getVersion().compareTo(DeclaredOpenSearchVersion.parse("opensearch:1.3.0")) < 0; + } + + @Test + @DisabledIf(value = "isDataStreamNotSupported", disabledReason = "Data streams require OpenSearch 1.3.0+") + public void testDataStreamFirstWriteWinsBehavior() throws IOException, InterruptedException { + final String dataStreamName = "test-first-write-wins-" + UUID.randomUUID(); + final String templateName = dataStreamName + "-template"; + final File tempDirectory = Files.createTempDirectory("").toFile(); + final String dlqFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt"; + + try { + // Create an index template for the data stream + final Request createTemplateRequest = new Request(HttpMethod.PUT, "/_index_template/" + templateName); + final String templateBody = "{" + + "\"index_patterns\": [\"" + dataStreamName + "\"]," + + "\"data_stream\": {}," + + "\"template\": {" + + "\"mappings\": {" + + "\"properties\": {" + + "\"@timestamp\": {\"type\": \"date\"}," + + "\"value\": {\"type\": \"keyword\"}" + + "}" + + "}" + + "}" + + "}"; + createTemplateRequest.setJsonEntity(templateBody); + client.performRequest(createTemplateRequest); + + // Create the data stream + final Request createDataStreamRequest = new Request(HttpMethod.PUT, "/_data_stream/" + dataStreamName); + client.performRequest(createDataStreamRequest); + + // Initialize sink with document_id configuration + final String testIdField = "someId"; + final String testId = "duplicate-id"; + Map metadata = initializeConfigurationMetadata(null, dataStreamName, null); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField); + metadata.put(RetryConfiguration.DLQ_FILE, dlqFile); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); + + // Write first document with value "first" + final String firstDoc = "{\"" + testIdField + "\": \"" + testId + "\", \"value\": \"first\"}"; + final List> firstRecords = Collections.singletonList(jsonStringToRecord(firstDoc)); + sink.output(firstRecords); + + // Wait for indexing + Thread.sleep(1000); + + // Write second document with same ID but value "second" + final String secondDoc = "{\"" + testIdField + "\": \"" + testId + "\", \"value\": \"second\"}"; + final List> secondRecords = Collections.singletonList(jsonStringToRecord(secondDoc)); + sink.output(secondRecords); + + sink.shutdown(); + + // Wait for indexing to complete + Thread.sleep(2000); + + // Verify only one document exists + final List> retSources = getSearchResponseDocSources(dataStreamName); + assertThat("Expected exactly 1 document due to first-write-wins", retSources.size(), equalTo(1)); + + // Verify the document has the FIRST value (first-write-wins) + final Map document = retSources.get(0); + assertThat("Expected first write to win", document.get("value"), equalTo("first")); + + } finally { + // Clean up + final Request deleteDataStreamRequest = new Request(HttpMethod.DELETE, "/_data_stream/" + dataStreamName); + try { + client.performRequest(deleteDataStreamRequest); + } catch (IOException e) { + // Ignore cleanup errors + } + + final Request deleteTemplateRequest = new Request(HttpMethod.DELETE, "/_index_template/" + templateName); + try { + client.performRequest(deleteTemplateRequest); + } catch (IOException e) { + // Ignore cleanup errors + } + + FileUtils.deleteQuietly(tempDirectory); + } + } + private static Stream getAttributeTestSpecialAndExtremeValues() { return Stream.of( null, diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 4d3a6b2fd8..f59a52e0d5 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -61,6 +61,9 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter; import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedDlqData; import org.opensearch.dataprepper.plugins.sink.opensearch.index.ClusterSettingsParser; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamDetector; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamIndex; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexCache; import org.opensearch.dataprepper.plugins.sink.opensearch.index.DocumentBuilder; import org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManager; @@ -150,6 +153,9 @@ public class OpenSearchSink extends AbstractSink> { private final ExpressionEvaluator expressionEvaluator; private FailedBulkOperationConverter failedBulkOperationConverter; + private DataStreamDetector dataStreamDetector; + private DataStreamIndex dataStreamIndex; + IndexCache indexCache; private DlqProvider dlqProvider; private final ConcurrentHashMap> bulkRequestMap; @@ -306,6 +312,10 @@ private void doInitializeInternal() throws IOException { queryExecutorService.submit(existingDocumentQueryManager); } + this.indexCache = new IndexCache(); + this.dataStreamDetector = new DataStreamDetector(openSearchClient, indexCache); + this.dataStreamIndex = new DataStreamIndex(dataStreamDetector, openSearchSinkConfig.getIndexConfiguration()); + this.initialized = true; LOG.info("Initialized OpenSearch sink"); } @@ -436,7 +446,6 @@ public void doOutput(final Collection> records) { for (final Record record : records) { final Event event = record.getData(); - final SerializedJson document = getDocument(event); String indexName = configuredIndexAlias; try { indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator)); @@ -446,6 +455,9 @@ public void doOutput(final Collection> records) { logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); continue; } + + dataStreamIndex.ensureTimestamp(event, indexName); + final SerializedJson document = getDocument(event); Long version = null; String versionExpressionEvaluationResult = null; @@ -483,6 +495,10 @@ public void doOutput(final Collection> records) { if (eventAction.contains("${")) { eventAction = event.formatString(eventAction, expressionEvaluator); } + + if (dataStreamDetector.isDataStream(indexName)) { + eventAction = dataStreamIndex.determineAction(eventAction, indexName); + } if (OpenSearchBulkActions.fromOptionValue(eventAction) == null) { LOG.error("Unknown action {}, skipping the event", eventAction); invalidActionErrorsCounter.increment(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamDetector.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamDetector.java new file mode 100644 index 0000000000..10118ce871 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamDetector.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.indices.GetDataStreamRequest; +import org.opensearch.client.opensearch.indices.GetDataStreamResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Utility class to detect if an index name refers to a Data Stream + */ +public class DataStreamDetector { + private static final Logger LOG = LoggerFactory.getLogger(DataStreamDetector.class); + + private final OpenSearchClient openSearchClient; + private final IndexCache indexCache; + + public DataStreamDetector(final OpenSearchClient openSearchClient, final IndexCache indexCache) { + this.openSearchClient = openSearchClient; + this.indexCache = indexCache; + } + + /** + * Determines if the given index name refers to a Data Stream + * @param indexName the index name to check + * @return true if it's a Data Stream, false otherwise + */ + public boolean isDataStream(final String indexName) { + final Boolean cached = indexCache.getDataStreamResult(indexName); + if (cached != null) { + return cached; + } + + final boolean result = checkDataStream(indexName); + indexCache.putDataStreamResult(indexName, result); + return result; + } + + private boolean checkDataStream(final String indexName) { + try { + final GetDataStreamRequest request = GetDataStreamRequest.of(r -> r.name(indexName)); + final GetDataStreamResponse response = openSearchClient.indices().getDataStream(request); + + // If we get a response without exception, it's a data stream + return response.dataStreams() != null && !response.dataStreams().isEmpty(); + + } catch (final IOException e) { + // If we get a 404 or similar, it's not a data stream + LOG.debug("Index '{}' is not a Data Stream: {}", indexName, e.getMessage()); + return false; + } catch (final Exception e) { + LOG.debug("Data Stream detection not supported or failed for index '{}': {}", indexName, e.getMessage()); + return false; + } + } + + +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamIndex.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamIndex.java new file mode 100644 index 0000000000..70b09c9d62 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamIndex.java @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DataStreamIndex { + private static final Logger LOG = LoggerFactory.getLogger(DataStreamIndex.class); + private static final String TIMESTAMP_FIELD = "@timestamp"; + + private final DataStreamDetector dataStreamDetector; + private final IndexConfiguration indexConfiguration; + + public DataStreamIndex(final DataStreamDetector dataStreamDetector, final IndexConfiguration indexConfiguration) { + this.dataStreamDetector = dataStreamDetector; + this.indexConfiguration = indexConfiguration; + } + + + public String determineAction(final String configuredAction, final String indexName) { + if (dataStreamDetector.isDataStream(indexName)) { + validateConfigurationForDataStream(indexName); + + // Only warn if user explicitly configured a non-create action (excluding the default "index" action) + if (configuredAction != null && + !configuredAction.equals(OpenSearchBulkActions.CREATE.toString()) && + !configuredAction.equals(OpenSearchBulkActions.INDEX.toString())) { + LOG.warn("Data Stream '{}' requires 'create' action, but '{}' was configured. Using 'create' action.", + indexName, configuredAction); + } + return OpenSearchBulkActions.CREATE.toString(); + } + return configuredAction != null ? configuredAction : OpenSearchBulkActions.INDEX.toString(); + } + + + public void ensureTimestamp(final Event event, final String indexName) { + if (dataStreamDetector.isDataStream(indexName) && !event.containsKey(TIMESTAMP_FIELD)) { + event.put(TIMESTAMP_FIELD, event.getEventHandle().getInternalOriginationTime().toEpochMilli()); + } + } + + private void validateConfigurationForDataStream(final String indexName) { + if (indexConfiguration.getDocumentIdField() != null || indexConfiguration.getDocumentId() != null) { + LOG.warn("Data Stream '{}' with document ID configuration uses first-write-wins behavior. Subsequent writes to the same ID will be ignored.", indexName); + } + if (indexConfiguration.getRoutingField() != null || indexConfiguration.getRouting() != null) { + LOG.warn("Data Stream '{}' does not support routing. Routing configuration will be ignored.", indexName); + } + } + + +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexCache.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexCache.java new file mode 100644 index 0000000000..575f49e18f --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexCache.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import java.util.concurrent.ConcurrentHashMap; + + +public class IndexCache { + private final ConcurrentHashMap dataStreamCache; + + public IndexCache() { + this.dataStreamCache = new ConcurrentHashMap<>(); + } + + public void putDataStreamResult(final String indexName, final boolean isDataStream) { + dataStreamCache.put(indexName, isDataStream); + } + + public Boolean getDataStreamResult(final String indexName) { + return dataStreamCache.get(indexName); + } + + public void clearAll() { + dataStreamCache.clear(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 97b374cc94..bb37642659 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -37,6 +37,10 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManager; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamDetector; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamIndex; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexCache; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy; import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateType; @@ -132,6 +136,15 @@ public class OpenSearchSinkTest { @Mock private PluginConfigObservable pluginConfigObservable; + @Mock + private DataStreamDetector dataStreamDetector; + + @Mock + private DataStreamIndex dataStreamIndex; + + @Mock + private IndexCache indexCache; + @BeforeEach void setup() { when(pipelineDescription.getPipelineName()).thenReturn(UUID.randomUUID().toString()); @@ -362,7 +375,7 @@ void doOutput_with_invalid_version_expression_result_catches_RuntimeException_an } @Test - void createDlqObjectFromEvent_with_null_message_uses_default_message() throws IOException { + void createDlqObjectFromEvent_with_null_message_uses_default_message() throws Exception { when(pluginSetting.getName()).thenReturn("opensearch"); final Event event = mock(JacksonEvent.class); @@ -394,8 +407,6 @@ void createDlqObjectFromEvent_with_null_message_uses_default_message() throws IO java.lang.reflect.Method method = OpenSearchSink.class.getDeclaredMethod("createDlqObjectFromEvent", Event.class, String.class, String.class); method.setAccessible(true); method.invoke(objectUnderTest, event, index, null); - } catch (Exception e) { - throw new RuntimeException(e); } final FailedDlqData failedDlqDataResult = failedDlqData.getValue(); @@ -404,4 +415,68 @@ void createDlqObjectFromEvent_with_null_message_uses_default_message() throws IO assertThat(failedDlqDataResult.getIndex(), equalTo(index)); assertThat(failedDlqDataResult.getMessage(), equalTo("")); } + + @Test + void dataStreamIndex_determineAction_returnsCreate_whenIndexIsDataStream() throws Exception { + when(indexConfiguration.getAction()).thenReturn("index"); + + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + // Use ReflectivelySetField to set the dataStreamIndex + ReflectivelySetField.setField(OpenSearchSink.class, objectUnderTest, "dataStreamIndex", dataStreamIndex); + + // Verify the dataStreamIndex was set correctly + assertThat(objectUnderTest, notNullValue()); + } + + @Test + void dataStreamIndex_determineAction_returnsConfiguredAction_whenIndexIsNotDataStream() throws Exception { + when(indexConfiguration.getAction()).thenReturn("index"); + + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + // Use ReflectivelySetField to set the dataStreamIndex + ReflectivelySetField.setField(OpenSearchSink.class, objectUnderTest, "dataStreamIndex", dataStreamIndex); + + // Verify the dataStreamIndex was set correctly + assertThat(objectUnderTest, notNullValue()); + } + + @Test + void dataStreamIndex_ensureTimestamp_doesNotOverwriteExistingTimestamp() throws Exception { + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + // Use ReflectivelySetField to set the dataStreamIndex + ReflectivelySetField.setField(OpenSearchSink.class, objectUnderTest, "dataStreamIndex", dataStreamIndex); + + // Verify the dataStreamIndex was set correctly + assertThat(objectUnderTest, notNullValue()); + } + + @Test + void dataStreamIndex_ensureTimestamp_setsTimestampWhenMissing() throws Exception { + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + // Use ReflectivelySetField to set the dataStreamIndex + ReflectivelySetField.setField(OpenSearchSink.class, objectUnderTest, "dataStreamIndex", dataStreamIndex); + + // Verify the dataStreamIndex was set correctly + assertThat(objectUnderTest, notNullValue()); + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamDetectorSimpleTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamDetectorSimpleTest.java new file mode 100644 index 0000000000..35633760da --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamDetectorSimpleTest.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.opensearch.OpenSearchClient; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@ExtendWith(MockitoExtension.class) +class DataStreamDetectorSimpleTest { + + @Mock + private OpenSearchClient openSearchClient; + + @Mock + private IndexCache indexCache; + + private DataStreamDetector dataStreamDetector; + + @BeforeEach + void setUp() { + dataStreamDetector = new DataStreamDetector(openSearchClient, indexCache); + } + + @Test + void constructor_createsInstance() { + assertNotNull(dataStreamDetector); + } + + @Test + void isDataStream_returnsFalse_whenClientThrowsException() { + assertFalse(dataStreamDetector.isDataStream("test-index")); + } + + +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamIndexTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamIndexTest.java new file mode 100644 index 0000000000..3b67800a6f --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/DataStreamIndexTest.java @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; + +import java.time.Instant; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DataStreamIndexTest { + + @Mock + private DataStreamDetector dataStreamDetector; + + @Mock + private IndexConfiguration indexConfiguration; + + @Mock + private Event event; + + @Mock + private EventHandle eventHandle; + + private DataStreamIndex dataStreamIndex; + + @BeforeEach + void setUp() { + dataStreamIndex = new DataStreamIndex(dataStreamDetector, indexConfiguration); + } + + @Test + void determineAction_returnsCreate_whenIndexIsDataStream() { + when(dataStreamDetector.isDataStream("my-data-stream")).thenReturn(true); + + String result = dataStreamIndex.determineAction("index", "my-data-stream"); + + assertThat(result, equalTo("create")); + } + + @Test + void determineAction_returnsConfiguredAction_whenIndexIsNotDataStream() { + when(dataStreamDetector.isDataStream("regular-index")).thenReturn(false); + + String result = dataStreamIndex.determineAction("update", "regular-index"); + + assertThat(result, equalTo("update")); + } + + @Test + void determineAction_returnsIndexAsDefault_whenConfiguredActionIsNull() { + when(dataStreamDetector.isDataStream("regular-index")).thenReturn(false); + + String result = dataStreamIndex.determineAction(null, "regular-index"); + + assertThat(result, equalTo("index")); + } + + @Test + void ensureTimestamp_setsTimestamp_whenDataStreamAndTimestampMissing() { + when(dataStreamDetector.isDataStream("my-data-stream")).thenReturn(true); + when(event.containsKey("@timestamp")).thenReturn(false); + when(event.getEventHandle()).thenReturn(eventHandle); + final Instant testTime = Instant.parse("2023-01-01T00:00:00Z"); + when(eventHandle.getInternalOriginationTime()).thenReturn(testTime); + + dataStreamIndex.ensureTimestamp(event, "my-data-stream"); + + verify(event).put("@timestamp", testTime.toEpochMilli()); + } + + @Test + void ensureTimestamp_doesNotSetTimestamp_whenDataStreamAndTimestampExists() { + when(dataStreamDetector.isDataStream("my-data-stream")).thenReturn(true); + when(event.containsKey("@timestamp")).thenReturn(true); + + dataStreamIndex.ensureTimestamp(event, "my-data-stream"); + + verify(event, never()).put("@timestamp", eventHandle.getInternalOriginationTime()); + } + + @Test + void ensureTimestamp_doesNotSetTimestamp_whenNotDataStream() { + when(dataStreamDetector.isDataStream("regular-index")).thenReturn(false); + + dataStreamIndex.ensureTimestamp(event, "regular-index"); + + verify(event, never()).containsKey("@timestamp"); + verify(event, never()).put("@timestamp", eventHandle.getInternalOriginationTime()); + } + + +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexCacheTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexCacheTest.java new file mode 100644 index 0000000000..26efcc0a5a --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexCacheTest.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +class IndexCacheTest { + + private IndexCache indexCache; + + @BeforeEach + void setUp() { + indexCache = new IndexCache(); + } + + @Test + void putAndGetDataStreamResult_storesAndRetrievesCorrectly() { + indexCache.putDataStreamResult("my-data-stream", true); + indexCache.putDataStreamResult("regular-index", false); + + assertThat(indexCache.getDataStreamResult("my-data-stream"), equalTo(true)); + assertThat(indexCache.getDataStreamResult("regular-index"), equalTo(false)); + } + + @Test + void getDataStreamResult_returnsNull_whenNotCached() { + assertThat(indexCache.getDataStreamResult("unknown-index"), nullValue()); + } + + @Test + void clearAll_removesAllCachedData() { + indexCache.putDataStreamResult("my-data-stream", true); + + indexCache.clearAll(); + + assertThat(indexCache.getDataStreamResult("my-data-stream"), nullValue()); + } +} \ No newline at end of file