diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java new file mode 100644 index 0000000000..9b6a3ba01a --- /dev/null +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java @@ -0,0 +1,506 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.Measurement; +import org.apache.http.util.EntityUtils; +import org.junit.Assert; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.DisabledIf; +import org.mockito.Mock; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.metric.DefaultQuantile; +import org.opensearch.dataprepper.model.metric.JacksonGauge; +import org.opensearch.dataprepper.model.metric.JacksonHistogram; +import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.JacksonSummary; +import org.opensearch.dataprepper.model.metric.Quantile; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; + +import javax.ws.rs.HttpMethod; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.http.HttpStatus.SC_OK; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.getHosts; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.waitForClusterStateUpdatesToFinish; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.wipeAllTemplates; + +/** + * Integration tests for the OpenSearch sink with {@code index_type: tsdb}. + * + *

These tests require a running OpenSearch cluster. Configure via system properties: + *

+ * + *

Note: TSDB-specific settings ({@code tsdb_engine.enabled}, {@code tsdb_store}) require the TSDB plugin + * installed on the OpenSearch cluster. These tests validate the sink's document conversion and indexing + * behavior using the standard TSDB index template. If the TSDB plugin is not installed, index creation + * will use the template mappings but TSDB-specific settings will be ignored by OpenSearch. + */ +class OpenSearchSinkTsdbIT { + + private static final String AUTHENTICATION = "authentication"; + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + private static final String PLUGIN_NAME = "opensearch"; + private static final String PIPELINE_NAME = "tsdbIntegTestPipeline"; + private static final String INCLUDE_TYPE_NAME_FALSE_URI = "?include_type_name=false"; + private static final String TEST_TIME = "2024-02-02T10:30:00Z"; + + private RestClient client; + private final List sinksToShutdown = new ArrayList<>(); + private ObjectMapper objectMapper; + + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock + private ExpressionEvaluator expressionEvaluator; + @Mock + private PipelineDescription pipelineDescription; + @Mock + private PluginSetting pluginSetting; + @Mock + private PluginConfigObservable pluginConfigObservable; + + @BeforeEach + void setup() { + pluginConfigObservable = mock(PluginConfigObservable.class); + expressionEvaluator = mock(ExpressionEvaluator.class); + pipelineDescription = mock(PipelineDescription.class); + pluginSetting = mock(PluginSetting.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); + when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false); + objectMapper = new ObjectMapper(); + } + + @BeforeEach + void metricsInit() throws IOException { + MetricsTestUtil.initMetrics(); + client = createOpenSearchClient(); + } + + @AfterEach + void cleanOpenSearch() throws Exception { + wipeAllOpenSearchIndices(); + wipeAllTemplates(); + waitForClusterStateUpdatesToFinish(); + } + + @AfterEach + void shutdownSinks() { + for (final OpenSearchSink sink : sinksToShutdown) { + sink.shutdown(); + } + sinksToShutdown.clear(); + } + + @AfterEach + void closeClient() throws IOException { + if (client != null) { + client.close(); + } + } + + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testInstantiateSinkTsdbDefault() throws IOException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + assertThat(indexAlias, equalTo("metrics-tsdb-v1")); + + final Request request = new Request(HttpMethod.HEAD, indexAlias); + final Response response = client.performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + + final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; + final Request mappingRequest = new Request(HttpMethod.GET, indexAlias + "/_mappings" + extraURI); + final Response mappingResponse = client.performRequest(mappingRequest); + final String mappingBody = EntityUtils.toString(mappingResponse.getEntity()); + @SuppressWarnings("unchecked") + final Map mappingResult = createContentParser(XContentType.JSON.xContent(), mappingBody).map(); + final String actualIndex = mappingResult.keySet().iterator().next(); + @SuppressWarnings("unchecked") + final Map mappings = (Map) ((Map) mappingResult.get(actualIndex)).get("mappings"); + assertThat(mappings, notNullValue()); + + @SuppressWarnings("unchecked") + final Map properties = (Map) mappings.get("properties"); + assertThat(properties, notNullValue()); + assertThat(properties.containsKey("labels"), equalTo(true)); + assertThat(properties.containsKey("timestamp"), equalTo(true)); + assertThat(properties.containsKey("value"), equalTo(true)); + + } + + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputGauge() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("cpu_temp") + .withValue(72.5) + .withTime(TEST_TIME) + .withAttributes(Map.of("host", "server-01")) + .withEventKind("GAUGE") + .build(); + + sink.output(List.of(new Record<>(gauge))); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + assertThat(sources, hasSize(1)); + + final Map doc = sources.get(0); + assertThat(doc.get("labels"), equalTo("__name__ cpu_temp host server-01")); + assertThat(((Number) doc.get("value")).doubleValue(), closeTo(72.5, 0.001)); + assertThat(doc.get("timestamp"), notNullValue()); + + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputMonotonicSum() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final JacksonSum sum = JacksonSum.builder() + .withName("http_requests") + .withValue(100.0) + .withIsMonotonic(true) + .withTime(TEST_TIME) + .withAttributes(Map.of("method", "GET")) + .withEventKind("SUM") + .build(); + + sink.output(List.of(new Record<>(sum))); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + assertThat(sources, hasSize(1)); + + final Map doc = sources.get(0); + assertThat(doc.get("labels"), equalTo("__name__ http_requests_total method GET")); + assertThat(((Number) doc.get("value")).doubleValue(), closeTo(100.0, 0.001)); + + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputHistogramExpansion() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final JacksonHistogram histogram = JacksonHistogram.builder() + .withName("request_duration") + .withSum(5.5) + .withCount(20L) + .withBucketCountsList(List.of(5L, 5L, 5L, 5L)) + .withExplicitBoundsList(List.of(0.1, 0.5, 1.0)) + .withTime(TEST_TIME) + .withAttributes(Map.of("method", "GET")) + .withEventKind("HISTOGRAM") + .build(); + + sink.output(List.of(new Record<>(histogram))); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + assertThat(sources, hasSize(6)); + + final List labels = sources.stream() + .map(s -> (String) s.get("labels")) + .collect(Collectors.toList()); + + assertThat(labels.stream().filter(l -> l.contains("request_duration_bucket")).count(), equalTo(4L)); + assertThat(labels.stream().filter(l -> l.contains("request_duration_count")).count(), equalTo(1L)); + assertThat(labels.stream().filter(l -> l.contains("request_duration_sum")).count(), equalTo(1L)); + + final Map labelToValue = sources.stream() + .collect(Collectors.toMap(s -> (String) s.get("labels"), s -> ((Number) s.get("value")).doubleValue())); + + assertThat(labelToValue.get("__name__ request_duration_bucket le 0.1 method GET"), closeTo(5.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_bucket le 0.5 method GET"), closeTo(10.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_bucket le 1 method GET"), closeTo(15.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_bucket le +Inf method GET"), closeTo(20.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_count method GET"), closeTo(20.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_sum method GET"), closeTo(5.5, 0.001)); + + final List documentsSuccess = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + assertThat(documentsSuccess.size(), equalTo(1)); + assertThat(documentsSuccess.get(0).getValue(), closeTo(6.0, 0)); + + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputSummaryExpansion() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, 0.2), + new DefaultQuantile(0.99, 0.8) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("rpc_latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(1000L) + .withSum(300.5) + .withTime(TEST_TIME) + .withAttributes(Map.of("service", "api")) + .withEventKind("SUMMARY") + .build(); + + sink.output(List.of(new Record<>(summary))); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + assertThat(sources, hasSize(4)); + + final Map labelToValue = sources.stream() + .collect(Collectors.toMap(s -> (String) s.get("labels"), s -> ((Number) s.get("value")).doubleValue())); + + assertThat(labelToValue.get("__name__ rpc_latency quantile 0.5 service api"), closeTo(0.2, 0.001)); + assertThat(labelToValue.get("__name__ rpc_latency quantile 0.99 service api"), closeTo(0.8, 0.001)); + assertThat(labelToValue.get("__name__ rpc_latency_count service api"), closeTo(1000.0, 0.001)); + assertThat(labelToValue.get("__name__ rpc_latency_sum service api"), closeTo(300.5, 0.001)); + + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputMixedMetricTypes() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("cpu_temp") + .withValue(72.5) + .withTime(TEST_TIME) + .withAttributes(Map.of("host", "server-01")) + .withEventKind("GAUGE") + .build(); + + final JacksonSum counter = JacksonSum.builder() + .withName("requests") + .withValue(500.0) + .withIsMonotonic(true) + .withTime(TEST_TIME) + .withAttributes(Map.of("path", "/api")) + .withEventKind("SUM") + .build(); + + final List> records = List.of(new Record<>(gauge), new Record<>(counter)); + sink.output(records); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + assertThat(sources, hasSize(2)); + + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testReinstantiateSinkDoesNotCreateDuplicateIndex() throws IOException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + createObjectUnderTest(config, true); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + Request request = new Request(HttpMethod.HEAD, indexAlias); + Response response = client.performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + + createObjectUnderTest(config, true); + + request = new Request(HttpMethod.HEAD, indexAlias); + response = client.performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + } + + + private OpenSearchSink createObjectUnderTest(final OpenSearchSinkConfig openSearchSinkConfig, final boolean doInitialize) { + final SinkContext sinkContext = mock(SinkContext.class); + when(sinkContext.getTagsTargetKey()).thenReturn(null); + when(sinkContext.getForwardToPipelines()).thenReturn(Map.of()); + when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME); + when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME); + when(pluginSetting.getName()).thenReturn(PLUGIN_NAME); + final OpenSearchSink sink = new OpenSearchSink( + pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, + pipelineDescription, pluginConfigObservable, openSearchSinkConfig); + if (doInitialize) { + sink.doInitialize(); + } + sinksToShutdown.add(sink); + return sink; + } + + private OpenSearchSinkConfig generateOpenSearchSinkConfig(final String indexType, final String indexAlias, + final String templateFilePath) throws JsonProcessingException { + final Map metadata = new HashMap<>(); + metadata.put(IndexConfiguration.INDEX_TYPE, indexType); + metadata.put(ConnectionConfiguration.HOSTS, getHosts()); + metadata.put(IndexConfiguration.INDEX_ALIAS, indexAlias); + metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath); + metadata.put(IndexConfiguration.FLUSH_TIMEOUT, -1); + metadata.put("insecure", true); + final String user = System.getProperty("tests.opensearch.user"); + final String password = System.getProperty("tests.opensearch.password"); + if (user != null && password != null) { + metadata.put(AUTHENTICATION, Map.of(USERNAME, user, PASSWORD, password)); + } + final String distributionVersion = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? + DistributionVersion.ES6.getVersion() : DistributionVersion.DEFAULT.getVersion(); + metadata.put(IndexConfiguration.DISTRIBUTION_VERSION, distributionVersion); + + final String json = new ObjectMapper().writeValueAsString(metadata); + return objectMapper.readValue(json, OpenSearchSinkConfig.class); + } + + private List> getSearchResponseDocSources(final String index) throws IOException { + final Request refresh = new Request(HttpMethod.POST, index + "/_refresh"); + client.performRequest(refresh); + final Request request = new Request(HttpMethod.GET, index + "/_search"); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); + + @SuppressWarnings("unchecked") + final List hits = (List) ((Map) createContentParser( + XContentType.JSON.xContent(), responseBody).map().get("hits")).get("hits"); + @SuppressWarnings("unchecked") + final List> sources = hits.stream() + .map(hit -> (Map) ((Map) hit).get("_source")) + .collect(Collectors.toList()); + return sources; + } + + private void wipeAllOpenSearchIndices() throws IOException { + final String getIndicesEndpoint = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? + "/*?expand_wildcards=all&include_type_name=false" : "/*?expand_wildcards=all"; + final Response response = client.performRequest(new Request("GET", getIndicesEndpoint)); + final String responseBody = EntityUtils.toString(response.getEntity()); + final Map indexContent = createContentParser(XContentType.JSON.xContent(), responseBody).map(); + final Set indices = indexContent.keySet(); + + indices.stream() + .filter(Objects::nonNull) + .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro-"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro_"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch-"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch_"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".ql"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".plugins-ml-config"))) + .forEach(indexName -> { + try { + client.performRequest(new Request("DELETE", "/" + indexName)); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + } + + private static boolean isES6() { + return DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(OpenSearchIntegrationHelper.getVersion()) >= 0; + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 7fcae130e9..37671aa3b4 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -32,6 +32,8 @@ import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; @@ -72,6 +74,8 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.CustomDocumentBuilder; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.CustomDocumentBuilderFactory; import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ServerlessOptions; import org.slf4j.Logger; @@ -168,6 +172,8 @@ public class OpenSearchSink extends AbstractSink> { private ExistingDocumentQueryManager existingDocumentQueryManager; + private final CustomDocumentBuilder customDocumentBuilder; + private final ExecutorService queryExecutorService; private final int processWorkerThreads; @@ -218,6 +224,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.pluginConfigObservable = pluginConfigObservable; this.objectMapper = new ObjectMapper(); this.bulkOperationFactory = new BulkOperationFactory(versionType, scriptManager, objectMapper, isUsingDocumentFilters()); + this.customDocumentBuilder = new CustomDocumentBuilderFactory().create(this.indexType); this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ? Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null; @@ -384,6 +391,32 @@ public void doOutput(final Collection> records) { } dataStreamIndex.ensureTimestamp(event, indexName); + + if (customDocumentBuilder != null) { + try { + final List tsdbDocs = customDocumentBuilder.buildDocuments(event); + final String tsdbAction = resolveEventAction(event); + final EventHandle eventHandle = event.getEventHandle(); + if (tsdbDocs.size() > 1 && eventHandle instanceof InternalEventHandle) { + for (int i = 0; i < tsdbDocs.size() - 1; i++) { + ((InternalEventHandle) eventHandle).acquireReference(); + } + } + for (final String tsdbDoc : tsdbDocs) { + final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDoc, null, null, null); + final BulkOperation op = bulkOperationFactory.create(tsdbAction, doc, null, indexName, null); + final BulkOperationWrapper wrapper = new BulkOperationWrapper(op, eventHandle, null, null); + bulkRequest = flushBatch(bulkRequest, wrapper, lastFlushTime); + bulkRequest.addOperation(wrapper); + } + } catch (final Exception e) { + LOG.error("Failed to build TSDB documents for event: {}", e.getMessage(), e); + dynamicIndexDroppedEvents.increment(); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); + } + continue; + } + final SerializedJson document = getDocument(event); Long version = null; @@ -416,20 +449,7 @@ public void doOutput(final Collection> records) { } } - String eventAction = action; - if (actions != null) { - for (final ActionConfiguration actionEntry: actions) { - final String condition = actionEntry.getWhen(); - eventAction = actionEntry.getType(); - if (condition != null && - expressionEvaluator.evaluateConditional(condition, event)) { - break; - } - } - } - if (eventAction.contains("${")) { - eventAction = event.formatString(eventAction, expressionEvaluator); - } + String eventAction = resolveEventAction(event); if (dataStreamDetector.isDataStream(indexName)) { eventAction = dataStreamIndex.determineAction(eventAction, indexName); @@ -542,7 +562,7 @@ void successfulOperationsHandler(final List successfulOper if (bulkOperation.getEvent() != null) { bulkOperation.getEvent().getEventHandle().release(true); } else { - bulkOperation.getEventHandle().release(true); + bulkOperation.releaseEventHandle(true); } } return; @@ -699,6 +719,24 @@ private DlqObject createDlqObjectFromEvent(final Event event, return builder.build(); } + private String resolveEventAction(final Event event) { + String resolvedAction = action; + if (actions != null) { + for (final ActionConfiguration actionEntry : actions) { + final String condition = actionEntry.getWhen(); + resolvedAction = actionEntry.getType(); + if (condition != null && + expressionEvaluator.evaluateConditional(condition, event)) { + break; + } + } + } + if (resolvedAction.contains("${")) { + resolvedAction = event.formatString(resolvedAction, expressionEvaluator); + } + return resolvedAction; + } + /** * This function is used for update and upsert bulk actions to determine whether the original JsonNode needs to be filtered down * based on the user's sink configuration. If a new parameter manipulates the document before sending to OpenSearch, it needs to be added to diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilder.java new file mode 100644 index 0000000000..20f268bab4 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilder.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import org.opensearch.dataprepper.model.event.Event; + +import java.util.List; + +/** + * Converts a Data Prepper {@link Event} into one or more JSON document strings + * suitable for indexing into OpenSearch. Implementations handle index-type-specific + * document transformations such as TSDB metric expansion. + */ +public interface CustomDocumentBuilder { + List buildDocuments(Event event); +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilderFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilderFactory.java new file mode 100644 index 0000000000..edf85a58c0 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilderFactory.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +public final class CustomDocumentBuilderFactory { + + public CustomDocumentBuilder create(final IndexType indexType) { + if (indexType == IndexType.TSDB) { + return new TSDBDocumentBuilder(); + } + return null; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 9bb73183b4..c55e317952 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -457,6 +457,8 @@ private Map readIndexTemplate(final String templateFile, final I templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_DEFAULT_TEMPLATE_FILE); } else if (indexType.equals(IndexType.METRIC_ANALYTICS_PLAIN)) { templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_STANDARD_TEMPLATE_FILE); + } else if (indexType.equals(IndexType.TSDB)) { + templateURL = loadExistingTemplate(templateType, IndexConstants.TSDB_DEFAULT_TEMPLATE_FILE); } else if (templateFile != null) { if (templateFile.toLowerCase().startsWith(S3_PREFIX)) { FileReader s3FileReader = new S3FileReader(s3Client); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java index 2df48191e7..08e2be0e4e 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java @@ -42,6 +42,8 @@ public class IndexConstants { public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_NO_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-no-ism-template.json"; public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_WITH_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-with-ism-template.json"; + public static final String TSDB_DEFAULT_TEMPLATE_FILE = "tsdb-index-template.json"; + static { // TODO: extract out version number into version enum TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_SERVICE_MAP, "otel-v1-apm-service-map"); @@ -52,5 +54,6 @@ public class IndexConstants { TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS_PLAIN, "logs-otel-v1"); TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS, "metrics-otel-v1"); TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS_PLAIN, "metrics-otel-v1"); + TYPE_TO_DEFAULT_ALIAS.put(IndexType.TSDB, "metrics-tsdb-v1"); } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java index 3977dc398c..bca777d859 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java @@ -20,6 +20,7 @@ public enum IndexType { LOG_ANALYTICS_PLAIN("log-analytics-plain"), METRIC_ANALYTICS("metric-analytics"), METRIC_ANALYTICS_PLAIN("metric-analytics-plain"), + TSDB("tsdb"), CUSTOM("custom"), MANAGEMENT_DISABLED("management_disabled"); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java new file mode 100644 index 0000000000..e994d50bdf --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java @@ -0,0 +1,244 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.metric.Gauge; +import org.opensearch.dataprepper.model.metric.Histogram; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.metric.Quantile; +import org.opensearch.dataprepper.model.metric.Sum; +import org.opensearch.dataprepper.model.metric.Summary; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +public final class TSDBDocumentBuilder implements CustomDocumentBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(TSDBDocumentBuilder.class); + private static final String NAME_LABEL = "__name__"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public List buildDocuments(final Event event) { + if (!(event instanceof Metric)) { + throw new IllegalArgumentException( + "TSDB index_type requires Metric events. Received: " + event.getClass().getName()); + } + + final Metric metric = (Metric) event; + final long timestamp = parseTimestamp(metric.getTime()); + final Map attributes = metric.getAttributes() != null + ? metric.getAttributes() : Collections.emptyMap(); + final String[][] sortedAttrs = sortAndSanitizeAttributes(attributes); + + if (metric instanceof Gauge) { + return buildGauge((Gauge) metric, sortedAttrs, timestamp); + } else if (metric instanceof Sum) { + return buildSum((Sum) metric, sortedAttrs, timestamp); + } else if (metric instanceof Histogram) { + return buildHistogram((Histogram) metric, sortedAttrs, timestamp); + } else if (metric instanceof Summary) { + return buildSummary((Summary) metric, sortedAttrs, timestamp); + } + + LOG.warn("Unsupported metric kind '{}' for metric '{}', building single document from value 0.0", + metric.getKind(), metric.getName()); + return List.of(buildJsonDoc(buildLabels(metric.getName(), sortedAttrs, null, null), timestamp, 0.0)); + } + + private static List buildGauge(final Gauge gauge, final String[][] sortedAttrs, final long timestamp) { + final double value = gauge.getValue() != null ? gauge.getValue() : 0.0; + final String labels = buildLabels(gauge.getName(), sortedAttrs, null, null); + return List.of(buildJsonDoc(labels, timestamp, value)); + } + + private static List buildSum(final Sum sum, final String[][] sortedAttrs, final long timestamp) { + final double value = sum.getValue() != null ? sum.getValue() : 0.0; + final String name = sum.isMonotonic() && !sum.getName().endsWith("_total") + ? sum.getName() + "_total" + : sum.getName(); + final String labels = buildLabels(name, sortedAttrs, null, null); + return List.of(buildJsonDoc(labels, timestamp, value)); + } + + private static List buildHistogram(final Histogram histogram, final String[][] sortedAttrs, final long timestamp) { + final String baseName = histogram.getName(); + final List bucketCounts = histogram.getBucketCountsList(); + final List explicitBounds = histogram.getExplicitBoundsList(); + final int bucketSize = (bucketCounts != null && explicitBounds != null) ? bucketCounts.size() : 0; + final List documents = new ArrayList<>(bucketSize + 2); + + if (bucketSize > 0) { + final String bucketName = baseName + "_bucket"; + long cumulativeCount = 0; + for (int i = 0; i < bucketCounts.size(); i++) { + cumulativeCount += bucketCounts.get(i); + final String le = i < explicitBounds.size() + ? formatDouble(explicitBounds.get(i)) + : "+Inf"; + final String labels = buildLabels(bucketName, sortedAttrs, "le", le); + documents.add(buildJsonDoc(labels, timestamp, (double) cumulativeCount)); + } + } + + appendCountAndSumDocuments(documents, baseName, histogram.getCount(), histogram.getSum(), sortedAttrs, timestamp); + return documents; + } + + private static List buildSummary(final Summary summary, final String[][] sortedAttrs, final long timestamp) { + final String baseName = summary.getName(); + final List quantiles = summary.getQuantiles(); + final int quantileSize = quantiles != null ? quantiles.size() : 0; + final List documents = new ArrayList<>(quantileSize + 2); + + if (quantiles != null) { + for (final Quantile q : quantiles) { + final double qValue = q.getValue() != null ? q.getValue() : 0.0; + if (!Double.isFinite(qValue)) { + continue; + } + final String labels = buildLabels(baseName, sortedAttrs, "quantile", formatDouble(q.getQuantile())); + documents.add(buildJsonDoc(labels, timestamp, qValue)); + } + } + + appendCountAndSumDocuments(documents, baseName, summary.getCount(), summary.getSum(), sortedAttrs, timestamp); + return documents; + } + + private static void appendCountAndSumDocuments(final List documents, final String baseName, + final Long count, final Double sum, + final String[][] sortedAttrs, final long timestamp) { + documents.add(buildJsonDoc( + buildLabels(baseName + "_count", sortedAttrs, null, null), + timestamp, count != null ? count.doubleValue() : 0.0)); + documents.add(buildJsonDoc( + buildLabels(baseName + "_sum", sortedAttrs, null, null), + timestamp, sum != null ? sum : 0.0)); + } + + private static String[][] sortAndSanitizeAttributes(final Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return new String[0][]; + } + final String[][] entries = new String[attributes.size()][]; + int idx = 0; + for (final Map.Entry e : attributes.entrySet()) { + if (e.getValue() != null) { + entries[idx++] = new String[]{sanitizeLabelKey(e.getKey()), sanitizeLabelValue(e.getValue().toString())}; + } + } + final String[][] trimmed = idx == entries.length ? entries : Arrays.copyOf(entries, idx); + Arrays.sort(trimmed, Comparator.comparing(a -> a[0])); + return trimmed; + } + + private static String buildLabels(final String metricName, final String[][] sortedAttrs, + final String extraKey, final String extraValue) { + final int estimatedSize = 10 + metricName.length() + + sortedAttrs.length * 20 + + (extraKey != null ? extraKey.length() + extraValue.length() + 2 : 0); + final StringBuilder sb = new StringBuilder(estimatedSize); + + sb.append(NAME_LABEL).append(' ').append(sanitizeLabelValue(metricName)); + + boolean extraInserted = (extraKey == null); + for (final String[] attr : sortedAttrs) { + if (!extraInserted && attr[0].equals(extraKey)) { + continue; + } + if (!extraInserted && extraKey.compareTo(attr[0]) < 0) { + sb.append(' ').append(extraKey).append(' ').append(extraValue); + extraInserted = true; + } + sb.append(' ').append(attr[0]).append(' ').append(attr[1]); + } + if (!extraInserted) { + sb.append(' ').append(extraKey).append(' ').append(extraValue); + } + + return sb.toString(); + } + + private static String buildJsonDoc(final String labels, final long timestamp, final double value) { + final ObjectNode node = OBJECT_MAPPER.createObjectNode(); + node.put("labels", labels); + node.put("timestamp", timestamp); + node.put("value", value); + try { + return OBJECT_MAPPER.writeValueAsString(node); + } catch (final JsonProcessingException e) { + throw new RuntimeException("Failed to serialize TSDB document", e); + } + } + + private static long parseTimestamp(final String isoTime) { + if (isoTime == null || isoTime.isEmpty()) { + LOG.warn("Metric has no timestamp, using current time"); + return System.currentTimeMillis(); + } + try { + return Instant.parse(isoTime).toEpochMilli(); + } catch (final Exception e) { + LOG.error("Failed to parse timestamp '{}', using current time", isoTime, e); + return System.currentTimeMillis(); + } + } + + private static String sanitizeLabelKey(final String key) { + if (key == null || key.isEmpty()) { + return "_"; + } + final StringBuilder sb = new StringBuilder(key.length() + 1); + final char first = key.charAt(0); + if (!(Character.isLetter(first) || first == '_')) { + sb.append('_'); + } + for (int i = 0; i < key.length(); i++) { + final char c = key.charAt(i); + sb.append(Character.isLetterOrDigit(c) || c == '_' ? c : '_'); + } + return sb.toString(); + } + + private static String sanitizeLabelValue(final String value) { + if (value == null) { + return ""; + } + return value.replace(' ', '_'); + } + + private static String formatDouble(final Double value) { + if (value == null) { + return "0"; + } + if (value == Double.POSITIVE_INFINITY) { + return "+Inf"; + } + if (value == Double.NEGATIVE_INFINITY) { + return "-Inf"; + } + if (value == (long) value.doubleValue()) { + return String.valueOf((long) value.doubleValue()); + } + return String.valueOf(value); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/resources/index-template/tsdb-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/index-template/tsdb-index-template.json new file mode 100644 index 0000000000..40ea634a3a --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/resources/index-template/tsdb-index-template.json @@ -0,0 +1,33 @@ +{ + "version": 1, + "template": { + "mappings": { + "properties": { + "series_ref": { + "type": "long", + "doc_values": false + }, + "labels": { + "type": "keyword", + "ignore_above": 4096 + }, + "value": { + "type": "double", + "doc_values": false + }, + "timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "timestamp_range": { + "type": "long_range" + } + } + }, + "settings": { + "index.translog.durability": "async", + "index.translog.sync_interval": "1s", + "refresh_interval": "1s" + } + } +} diff --git a/data-prepper-plugins/opensearch/src/main/resources/tsdb-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/tsdb-index-template.json new file mode 100644 index 0000000000..bcab4e00b6 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/resources/tsdb-index-template.json @@ -0,0 +1,31 @@ +{ + "version": 1, + "mappings": { + "properties": { + "series_ref": { + "type": "long", + "doc_values": false + }, + "labels": { + "type": "keyword", + "ignore_above": 4096 + }, + "value": { + "type": "double", + "doc_values": false + }, + "timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "timestamp_range": { + "type": "long_range" + } + } + }, + "settings": { + "index.translog.durability": "async", + "index.translog.sync_interval": "1s", + "refresh_interval": "1s" + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 627ba92d58..505de4372f 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -243,20 +243,18 @@ void test_initialization_with_failure_and_retry_with_query_manager() throws IOEx @Test void test_sink_successful_records_handling_without_forwarding_pipelines_bulk_operations_with_event_handles() throws Exception { when(sinkContext.getForwardToPipelines()).thenReturn(Map.of()); - final EventHandle eventHandle = mock(EventHandle.class); final OpenSearchSink objectUnderTest = createObjectUnderTest(); BulkOperationWrapper op1 = mock(BulkOperationWrapper.class); BulkOperationWrapper op2 = mock(BulkOperationWrapper.class); when(op1.getEvent()).thenReturn(null); when(op2.getEvent()).thenReturn(null); - when(op1.getEventHandle()).thenReturn(eventHandle); - when(op2.getEventHandle()).thenReturn(eventHandle); List operationsList = new ArrayList<>(); operationsList.add(op1); operationsList.add(op2); objectUnderTest.successfulOperationsHandler(operationsList); - verify(eventHandle, times(2)).release(eq(true)); - + verify(op1).releaseEventHandle(eq(true)); + verify(op2).releaseEventHandle(eq(true)); + } @Test diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java index a1c0a7067f..c7b6b19505 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java @@ -33,11 +33,12 @@ public void getByValue() { assertEquals(Optional.of(IndexType.LOG_ANALYTICS_PLAIN), IndexType.getByValue("log-analytics-plain")); assertEquals(Optional.of(IndexType.METRIC_ANALYTICS), IndexType.getByValue("metric-analytics")); assertEquals(Optional.of(IndexType.METRIC_ANALYTICS_PLAIN), IndexType.getByValue("metric-analytics-plain")); + assertEquals(Optional.of(IndexType.TSDB), IndexType.getByValue("tsdb")); } @Test public void getIndexTypeValues() { - assertEquals("[trace-analytics-raw, trace-analytics-plain-raw, trace-analytics-service-map, otel-v2-apm-service-map, log-analytics, log-analytics-plain, metric-analytics, metric-analytics-plain, custom, management_disabled]", IndexType.getIndexTypeValues()); + assertEquals("[trace-analytics-raw, trace-analytics-plain-raw, trace-analytics-service-map, otel-v2-apm-service-map, log-analytics, log-analytics-plain, metric-analytics, metric-analytics-plain, tsdb, custom, management_disabled]", IndexType.getIndexTypeValues()); } @ParameterizedTest diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java new file mode 100644 index 0000000000..087ed46718 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java @@ -0,0 +1,469 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.metric.DefaultQuantile; +import org.opensearch.dataprepper.model.metric.JacksonGauge; +import org.opensearch.dataprepper.model.metric.JacksonHistogram; +import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.JacksonSummary; +import org.opensearch.dataprepper.model.metric.Quantile; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TSDBDocumentBuilderTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String TEST_TIME = "2024-02-02T10:30:00Z"; + private static final long TEST_TIMESTAMP_MILLIS = 1706869800000L; + + private TSDBDocumentBuilder builder; + + @BeforeEach + void setUp() { + builder = new TSDBDocumentBuilder(); + } + + @Test + void build_gauge_returns_single_document() throws Exception { + final Map attributes = Map.of("host", "server-01"); + final JacksonGauge gauge = JacksonGauge.builder() + .withName("cpu_temp") + .withValue(72.5) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.buildDocuments(gauge); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + assertEquals("__name__ cpu_temp host server-01", doc.get("labels")); + assertEquals(TEST_TIMESTAMP_MILLIS, ((Number) doc.get("timestamp")).longValue()); + assertEquals(72.5, ((Number) doc.get("value")).doubleValue(), 0.001); + } + + @Test + void build_gauge_with_no_attributes() throws Exception { + final JacksonGauge gauge = JacksonGauge.builder() + .withName("memory_usage") + .withValue(85.0) + .withTime(TEST_TIME) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.buildDocuments(gauge); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + assertEquals("__name__ memory_usage", doc.get("labels")); + assertEquals(85.0, ((Number) doc.get("value")).doubleValue(), 0.001); + } + + @Test + void build_monotonic_sum_adds_total_suffix() throws Exception { + final Map attributes = Map.of("method", "GET"); + final JacksonSum sum = JacksonSum.builder() + .withName("http_requests") + .withValue(100.0) + .withIsMonotonic(true) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("SUM") + .build(); + + final List docs = builder.buildDocuments(sum); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + assertEquals("__name__ http_requests_total method GET", doc.get("labels")); + assertEquals(100.0, ((Number) doc.get("value")).doubleValue(), 0.001); + } + + @Test + void build_non_monotonic_sum_no_total_suffix() throws Exception { + final JacksonSum sum = JacksonSum.builder() + .withName("temperature") + .withValue(22.5) + .withIsMonotonic(false) + .withTime(TEST_TIME) + .withEventKind("SUM") + .build(); + + final List docs = builder.buildDocuments(sum); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + assertEquals("__name__ temperature", doc.get("labels")); + } + + @Test + void build_monotonic_sum_already_has_total_suffix() throws Exception { + final JacksonSum sum = JacksonSum.builder() + .withName("http_requests_total") + .withValue(100.0) + .withIsMonotonic(true) + .withTime(TEST_TIME) + .withEventKind("SUM") + .build(); + + final List docs = builder.buildDocuments(sum); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + assertEquals("__name__ http_requests_total", doc.get("labels")); + } + + @Test + void build_histogram_returns_bucket_plus_count_plus_sum_documents() throws Exception { + final Map attributes = Map.of("method", "GET"); + final JacksonHistogram histogram = JacksonHistogram.builder() + .withName("request_duration") + .withSum(5.5) + .withCount(20L) + .withBucketCountsList(List.of(5L, 5L, 5L, 5L)) + .withExplicitBoundsList(List.of(0.1, 0.5, 1.0)) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("HISTOGRAM") + .build(); + + final List docs = builder.buildDocuments(histogram); + + assertEquals(6, docs.size()); + + final Map bucket1 = parseJson(docs.get(0)); + assertEquals("__name__ request_duration_bucket le 0.1 method GET", bucket1.get("labels")); + assertEquals(5.0, ((Number) bucket1.get("value")).doubleValue(), 0.001); + + final Map bucket2 = parseJson(docs.get(1)); + assertEquals("__name__ request_duration_bucket le 0.5 method GET", bucket2.get("labels")); + assertEquals(10.0, ((Number) bucket2.get("value")).doubleValue(), 0.001); + + final Map bucket3 = parseJson(docs.get(2)); + assertEquals("__name__ request_duration_bucket le 1 method GET", bucket3.get("labels")); + assertEquals(15.0, ((Number) bucket3.get("value")).doubleValue(), 0.001); + + final Map bucket4 = parseJson(docs.get(3)); + assertEquals("__name__ request_duration_bucket le +Inf method GET", bucket4.get("labels")); + assertEquals(20.0, ((Number) bucket4.get("value")).doubleValue(), 0.001); + + final Map countDoc = parseJson(docs.get(4)); + assertEquals("__name__ request_duration_count method GET", countDoc.get("labels")); + assertEquals(20.0, ((Number) countDoc.get("value")).doubleValue(), 0.001); + + final Map sumDoc = parseJson(docs.get(5)); + assertEquals("__name__ request_duration_sum method GET", sumDoc.get("labels")); + assertEquals(5.5, ((Number) sumDoc.get("value")).doubleValue(), 0.001); + } + + @Test + void build_histogram_all_documents_have_same_timestamp() throws Exception { + final JacksonHistogram histogram = JacksonHistogram.builder() + .withName("latency") + .withSum(1.0) + .withCount(10L) + .withBucketCountsList(List.of(5L, 5L)) + .withExplicitBoundsList(List.of(0.5)) + .withTime(TEST_TIME) + .withEventKind("HISTOGRAM") + .build(); + + final List docs = builder.buildDocuments(histogram); + + for (final String jsonDoc : docs) { + final Map doc = parseJson(jsonDoc); + assertEquals(TEST_TIMESTAMP_MILLIS, ((Number) doc.get("timestamp")).longValue()); + } + } + + @Test + void build_summary_returns_quantile_plus_count_plus_sum_documents() throws Exception { + final Map attributes = Map.of("service", "api"); + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, 0.2), + new DefaultQuantile(0.99, 0.8) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("rpc_latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(1000L) + .withSum(300.5) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.buildDocuments(summary); + + assertEquals(4, docs.size()); + + final Map q1 = parseJson(docs.get(0)); + assertEquals("__name__ rpc_latency quantile 0.5 service api", q1.get("labels")); + assertEquals(0.2, ((Number) q1.get("value")).doubleValue(), 0.001); + + final Map q2 = parseJson(docs.get(1)); + assertEquals("__name__ rpc_latency quantile 0.99 service api", q2.get("labels")); + assertEquals(0.8, ((Number) q2.get("value")).doubleValue(), 0.001); + + final Map countDoc = parseJson(docs.get(2)); + assertEquals("__name__ rpc_latency_count service api", countDoc.get("labels")); + assertEquals(1000.0, ((Number) countDoc.get("value")).doubleValue(), 0.001); + + final Map sumDoc = parseJson(docs.get(3)); + assertEquals("__name__ rpc_latency_sum service api", sumDoc.get("labels")); + assertEquals(300.5, ((Number) sumDoc.get("value")).doubleValue(), 0.001); + } + + @Test + void labels_are_sorted_lexicographically() throws Exception { + final Map attributes = new HashMap<>(); + attributes.put("zone", "us-east"); + attributes.put("app", "myservice"); + attributes.put("host", "server-01"); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("cpu_temp") + .withValue(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.buildDocuments(gauge); + final Map doc = parseJson(docs.get(0)); + + assertEquals("__name__ cpu_temp app myservice host server-01 zone us-east", doc.get("labels")); + } + + @Test + void label_values_with_spaces_are_sanitized() throws Exception { + final Map attributes = Map.of("handler", "/api/items with spaces"); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("requests") + .withValue(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.buildDocuments(gauge); + final Map doc = parseJson(docs.get(0)); + + assertTrue(((String) doc.get("labels")).contains("handler /api/items_with_spaces")); + } + + @Test + void label_keys_with_invalid_chars_are_sanitized() throws Exception { + final Map attributes = Map.of("123", "val"); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("test") + .withValue(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.buildDocuments(gauge); + final Map doc = parseJson(docs.get(0)); + + assertTrue(((String) doc.get("labels")).contains("_123 val")); + } + + @Test + void label_keys_with_dashes_are_sanitized() throws Exception { + final Map attributes = Map.of("key-with-dash", "val"); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("test") + .withValue(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.buildDocuments(gauge); + final Map doc = parseJson(docs.get(0)); + + assertTrue(((String) doc.get("labels")).contains("key_with_dash val")); + } + + @Test + void extra_label_is_merge_inserted_at_correct_sorted_position() throws Exception { + final Map attributes = new HashMap<>(); + attributes.put("method", "GET"); + attributes.put("region", "us-east"); + + final JacksonHistogram histogram = JacksonHistogram.builder() + .withName("duration") + .withSum(1.0) + .withCount(10L) + .withBucketCountsList(List.of(10L)) + .withExplicitBoundsList(List.of(0.5)) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("HISTOGRAM") + .build(); + + final List docs = builder.buildDocuments(histogram); + final Map bucketDoc = parseJson(docs.get(0)); + + assertEquals("__name__ duration_bucket le 0.5 method GET region us-east", bucketDoc.get("labels")); + } + + @Test + void extra_label_sorting_after_all_attributes() throws Exception { + final Map attributes = Map.of("app", "web"); + final List quantiles = Arrays.asList(new DefaultQuantile(0.99, 1.0)); + + final JacksonSummary summary = JacksonSummary.builder() + .withName("latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(1) + .withCount(1L) + .withSum(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.buildDocuments(summary); + final Map quantileDoc = parseJson(docs.get(0)); + + assertEquals("__name__ latency app web quantile 0.99", quantileDoc.get("labels")); + } + + @Test + void build_with_valid_timestamp() throws Exception { + final JacksonGauge gauge = JacksonGauge.builder() + .withName("test") + .withValue(1.0) + .withTime("2024-02-02T10:30:00Z") + .withEventKind("GAUGE") + .build(); + + final List docs = builder.buildDocuments(gauge); + final Map doc = parseJson(docs.get(0)); + + assertEquals(TEST_TIMESTAMP_MILLIS, ((Number) doc.get("timestamp")).longValue()); + } + + @Test + void build_throws_for_non_metric_event() { + final Event event = JacksonEvent.builder() + .withEventType("LOG") + .withData(Map.of("message", "hello")) + .build(); + + assertThrows(IllegalArgumentException.class, () -> builder.buildDocuments(event)); + } + + @Test + void build_summary_skips_quantiles_with_NaN_values() throws Exception { + final Map attributes = Map.of("service", "api"); + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, Double.NaN), + new DefaultQuantile(0.99, 0.8) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("rpc_latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(1000L) + .withSum(300.5) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.buildDocuments(summary); + + assertEquals(3, docs.size()); + + final Map q1 = parseJson(docs.get(0)); + assertEquals("__name__ rpc_latency quantile 0.99 service api", q1.get("labels")); + assertEquals(0.8, ((Number) q1.get("value")).doubleValue(), 0.001); + + final Map countDoc = parseJson(docs.get(1)); + assertEquals("__name__ rpc_latency_count service api", countDoc.get("labels")); + + final Map sumDoc = parseJson(docs.get(2)); + assertEquals("__name__ rpc_latency_sum service api", sumDoc.get("labels")); + } + + @Test + void build_summary_skips_all_quantiles_when_all_NaN() throws Exception { + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, Double.NaN), + new DefaultQuantile(0.99, Double.NaN) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("rpc_latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(0L) + .withSum(0.0) + .withTime(TEST_TIME) + .withAttributes(Map.of()) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.buildDocuments(summary); + + assertEquals(2, docs.size()); + assertTrue(parseJson(docs.get(0)).get("labels").toString().contains("_count")); + assertTrue(parseJson(docs.get(1)).get("labels").toString().contains("_sum")); + } + + @Test + void build_summary_skips_quantiles_with_infinite_values() throws Exception { + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, Double.POSITIVE_INFINITY), + new DefaultQuantile(0.99, 0.5) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(100L) + .withSum(50.0) + .withTime(TEST_TIME) + .withAttributes(Map.of()) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.buildDocuments(summary); + + assertEquals(3, docs.size()); + final Map q1 = parseJson(docs.get(0)); + assertEquals(0.5, ((Number) q1.get("value")).doubleValue(), 0.001); + } + + private Map parseJson(final String json) throws Exception { + return OBJECT_MAPPER.readValue(json, new TypeReference>() {}); + } +}