From 01d8770eb410a1c44994be83ded005d5814fecdb Mon Sep 17 00:00:00 2001 From: Srikanth Padakanti Date: Fri, 13 Mar 2026 02:28:11 -0500 Subject: [PATCH 1/5] Add index_type tsdb support to OpenSearch sink for Prometheus metrics Signed-off-by: Srikanth Padakanti --- .../sink/opensearch/OpenSearchSinkTsdbIT.java | 528 ++++++++++++++++++ .../sink/opensearch/OpenSearchSink.java | 66 ++- .../opensearch/index/IndexConfiguration.java | 2 + .../sink/opensearch/index/IndexConstants.java | 3 + .../opensearch/index/IndexManagerFactory.java | 1 + .../sink/opensearch/index/IndexType.java | 1 + .../opensearch/index/TSDBDocumentBuilder.java | 260 +++++++++ .../index-template/tsdb-index-template.json | 33 ++ .../main/resources/tsdb-index-template.json | 31 + .../sink/opensearch/OpenSearchSinkTest.java | 8 +- .../sink/opensearch/index/IndexTypeTests.java | 3 +- .../index/TSDBDocumentBuilderTest.java | 417 ++++++++++++++ 12 files changed, 1332 insertions(+), 21 deletions(-) create mode 100644 data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java create mode 100644 data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java create mode 100644 data-prepper-plugins/opensearch/src/main/resources/index-template/tsdb-index-template.json create mode 100644 data-prepper-plugins/opensearch/src/main/resources/tsdb-index-template.json create mode 100644 data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java new file mode 100644 index 0000000000..2603d95657 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java @@ -0,0 +1,528 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.Measurement; +import org.apache.http.util.EntityUtils; +import org.junit.Assert; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.DisabledIf; +import org.mockito.Mock; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.MetricNames; +import org.opensearch.dataprepper.metrics.MetricsTestUtil; +import org.opensearch.dataprepper.model.configuration.PipelineDescription; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.metric.DefaultQuantile; +import org.opensearch.dataprepper.model.metric.JacksonGauge; +import org.opensearch.dataprepper.model.metric.JacksonHistogram; +import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.JacksonSummary; +import org.opensearch.dataprepper.model.metric.Quantile; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; + +import javax.ws.rs.HttpMethod; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.http.HttpStatus.SC_OK; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.getHosts; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.waitForClusterStateUpdatesToFinish; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.wipeAllTemplates; + +/** + * Integration tests for the OpenSearch sink with {@code index_type: tsdb}. + * + *

These tests require a running OpenSearch cluster. Configure via system properties: + *

+ * + *

Note: TSDB-specific settings ({@code tsdb_engine.enabled}, {@code tsdb_store}) require the TSDB plugin + * installed on the OpenSearch cluster. These tests validate the sink's document conversion and indexing + * behavior using the standard TSDB index template. If the TSDB plugin is not installed, index creation + * will use the template mappings but TSDB-specific settings will be ignored by OpenSearch. + */ +class OpenSearchSinkTsdbIT { + + private static final String AUTHENTICATION = "authentication"; + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + private static final String PLUGIN_NAME = "opensearch"; + private static final String PIPELINE_NAME = "tsdbIntegTestPipeline"; + private static final String INCLUDE_TYPE_NAME_FALSE_URI = "?include_type_name=false"; + private static final String TEST_TIME = "2024-02-02T10:30:00Z"; + + private RestClient client; + private final List sinksToShutdown = new ArrayList<>(); + private ObjectMapper objectMapper; + + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + @Mock + private ExpressionEvaluator expressionEvaluator; + @Mock + private PipelineDescription pipelineDescription; + @Mock + private PluginSetting pluginSetting; + @Mock + private PluginConfigObservable pluginConfigObservable; + + @BeforeEach + void setup() { + pluginConfigObservable = mock(PluginConfigObservable.class); + expressionEvaluator = mock(ExpressionEvaluator.class); + pipelineDescription = mock(PipelineDescription.class); + pluginSetting = mock(PluginSetting.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); + when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false); + objectMapper = new ObjectMapper(); + } + + @BeforeEach + void metricsInit() throws IOException { + MetricsTestUtil.initMetrics(); + client = createOpenSearchClient(); + } + + @AfterEach + void cleanOpenSearch() throws Exception { + wipeAllOpenSearchIndices(); + wipeAllTemplates(); + waitForClusterStateUpdatesToFinish(); + } + + @AfterEach + void shutdownSinks() { + for (final OpenSearchSink sink : sinksToShutdown) { + sink.shutdown(); + } + sinksToShutdown.clear(); + } + + @AfterEach + void closeClient() throws IOException { + if (client != null) { + client.close(); + } + } + + // --- Index Initialization --- + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testInstantiateSinkTsdbDefault() throws IOException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + assertThat(indexAlias, equalTo("metrics-tsdb-v1")); + + final Request request = new Request(HttpMethod.HEAD, indexAlias); + final Response response = client.performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + + // Query mappings via alias (TSDB uses NoIsmPolicyManagement so index name may not follow -000001 pattern) + final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; + final Request mappingRequest = new Request(HttpMethod.GET, indexAlias + "/_mappings" + extraURI); + final Response mappingResponse = client.performRequest(mappingRequest); + final String mappingBody = EntityUtils.toString(mappingResponse.getEntity()); + @SuppressWarnings("unchecked") + final Map mappingResult = createContentParser(XContentType.JSON.xContent(), mappingBody).map(); + final String actualIndex = mappingResult.keySet().iterator().next(); + @SuppressWarnings("unchecked") + final Map mappings = (Map) ((Map) mappingResult.get(actualIndex)).get("mappings"); + assertThat(mappings, notNullValue()); + + // Verify TSDB-specific mapping fields + @SuppressWarnings("unchecked") + final Map properties = (Map) mappings.get("properties"); + assertThat(properties, notNullValue()); + assertThat(properties.containsKey("labels"), equalTo(true)); + assertThat(properties.containsKey("timestamp"), equalTo(true)); + assertThat(properties.containsKey("value"), equalTo(true)); + + // TSDB uses NoIsmPolicyManagement — no ISM policy should be attached + // (unlike metric-analytics which has ISM) + } + + // --- Gauge Output --- + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputGauge() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("cpu_temp") + .withValue(72.5) + .withTime(TEST_TIME) + .withAttributes(Map.of("host", "server-01")) + .withEventKind("GAUGE") + .build(); + + sink.output(List.of(new Record<>(gauge))); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + assertThat(sources, hasSize(1)); + + final Map doc = sources.get(0); + assertThat(doc.get("labels"), equalTo("__name__ cpu_temp host server-01")); + assertThat(((Number) doc.get("value")).doubleValue(), closeTo(72.5, 0.001)); + assertThat(doc.get("timestamp"), notNullValue()); + + // Verify metrics + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + // --- Sum (Counter) Output --- + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputMonotonicSum() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final JacksonSum sum = JacksonSum.builder() + .withName("http_requests") + .withValue(100.0) + .withIsMonotonic(true) + .withTime(TEST_TIME) + .withAttributes(Map.of("method", "GET")) + .withEventKind("SUM") + .build(); + + sink.output(List.of(new Record<>(sum))); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + assertThat(sources, hasSize(1)); + + final Map doc = sources.get(0); + // Monotonic sum should have _total suffix + assertThat(doc.get("labels"), equalTo("__name__ http_requests_total method GET")); + assertThat(((Number) doc.get("value")).doubleValue(), closeTo(100.0, 0.001)); + + // Verify metrics + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + // --- Histogram Expansion --- + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputHistogramExpansion() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final JacksonHistogram histogram = JacksonHistogram.builder() + .withName("request_duration") + .withSum(5.5) + .withCount(20L) + .withBucketCountsList(List.of(5L, 5L, 5L, 5L)) + .withExplicitBoundsList(List.of(0.1, 0.5, 1.0)) + .withTime(TEST_TIME) + .withAttributes(Map.of("method", "GET")) + .withEventKind("HISTOGRAM") + .build(); + + sink.output(List.of(new Record<>(histogram))); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + // 4 bucket docs + 1 _count + 1 _sum = 6 + assertThat(sources, hasSize(6)); + + // Collect labels for verification + final List labels = sources.stream() + .map(s -> (String) s.get("labels")) + .collect(Collectors.toList()); + + // Verify bucket docs exist with cumulative counts + assertThat(labels.stream().filter(l -> l.contains("request_duration_bucket")).count(), equalTo(4L)); + assertThat(labels.stream().filter(l -> l.contains("request_duration_count")).count(), equalTo(1L)); + assertThat(labels.stream().filter(l -> l.contains("request_duration_sum")).count(), equalTo(1L)); + + // Verify cumulative bucket values + final Map labelToValue = sources.stream() + .collect(Collectors.toMap(s -> (String) s.get("labels"), s -> ((Number) s.get("value")).doubleValue())); + + assertThat(labelToValue.get("__name__ request_duration_bucket le 0.1 method GET"), closeTo(5.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_bucket le 0.5 method GET"), closeTo(10.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_bucket le 1 method GET"), closeTo(15.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_bucket le +Inf method GET"), closeTo(20.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_count method GET"), closeTo(20.0, 0.001)); + assertThat(labelToValue.get("__name__ request_duration_sum method GET"), closeTo(5.5, 0.001)); + + // Verify document success count matches expanded document count + final List documentsSuccess = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); + assertThat(documentsSuccess.size(), equalTo(1)); + assertThat(documentsSuccess.get(0).getValue(), closeTo(6.0, 0)); + + // Verify no errors + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + // --- Summary Expansion --- + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputSummaryExpansion() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, 0.2), + new DefaultQuantile(0.99, 0.8) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("rpc_latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(1000L) + .withSum(300.5) + .withTime(TEST_TIME) + .withAttributes(Map.of("service", "api")) + .withEventKind("SUMMARY") + .build(); + + sink.output(List.of(new Record<>(summary))); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + // 2 quantile docs + 1 _count + 1 _sum = 4 + assertThat(sources, hasSize(4)); + + final Map labelToValue = sources.stream() + .collect(Collectors.toMap(s -> (String) s.get("labels"), s -> ((Number) s.get("value")).doubleValue())); + + assertThat(labelToValue.get("__name__ rpc_latency quantile 0.5 service api"), closeTo(0.2, 0.001)); + assertThat(labelToValue.get("__name__ rpc_latency quantile 0.99 service api"), closeTo(0.8, 0.001)); + assertThat(labelToValue.get("__name__ rpc_latency_count service api"), closeTo(1000.0, 0.001)); + assertThat(labelToValue.get("__name__ rpc_latency_sum service api"), closeTo(300.5, 0.001)); + + // Verify metrics + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + // --- Multiple Metrics in Single Batch --- + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testOutputMixedMetricTypes() throws IOException, InterruptedException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + final OpenSearchSink sink = createObjectUnderTest(config, true); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("cpu_temp") + .withValue(72.5) + .withTime(TEST_TIME) + .withAttributes(Map.of("host", "server-01")) + .withEventKind("GAUGE") + .build(); + + final JacksonSum counter = JacksonSum.builder() + .withName("requests") + .withValue(500.0) + .withIsMonotonic(true) + .withTime(TEST_TIME) + .withAttributes(Map.of("path", "/api")) + .withEventKind("SUM") + .build(); + + final List> records = List.of(new Record<>(gauge), new Record<>(counter)); + sink.output(records); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + final List> sources = getSearchResponseDocSources(indexAlias); + // 1 gauge + 1 sum = 2 documents + assertThat(sources, hasSize(2)); + + // Verify metrics + final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); + assertThat(bulkRequestErrors.size(), equalTo(1)); + Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); + } + + // --- Re-instantiation (no duplicate index) --- + + @Test + @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") + @Timeout(value = 50, unit = TimeUnit.SECONDS) + void testReinstantiateSinkDoesNotCreateDuplicateIndex() throws IOException { + final OpenSearchSinkConfig config = generateOpenSearchSinkConfig(IndexType.TSDB.getValue(), null, null); + createObjectUnderTest(config, true); + + final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); + Request request = new Request(HttpMethod.HEAD, indexAlias); + Response response = client.performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + + // Reinstantiate sink — should not fail + createObjectUnderTest(config, true); + + request = new Request(HttpMethod.HEAD, indexAlias); + response = client.performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); + } + + // --- Helper methods --- + + private OpenSearchSink createObjectUnderTest(final OpenSearchSinkConfig openSearchSinkConfig, final boolean doInitialize) { + final SinkContext sinkContext = mock(SinkContext.class); + when(sinkContext.getTagsTargetKey()).thenReturn(null); + when(sinkContext.getForwardToPipelines()).thenReturn(Map.of()); + when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME); + when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME); + when(pluginSetting.getName()).thenReturn(PLUGIN_NAME); + final OpenSearchSink sink = new OpenSearchSink( + pluginSetting, sinkContext, expressionEvaluator, awsCredentialsSupplier, + pipelineDescription, pluginConfigObservable, openSearchSinkConfig); + if (doInitialize) { + sink.doInitialize(); + } + sinksToShutdown.add(sink); + return sink; + } + + private OpenSearchSinkConfig generateOpenSearchSinkConfig(final String indexType, final String indexAlias, + final String templateFilePath) throws JsonProcessingException { + final Map metadata = new HashMap<>(); + metadata.put(IndexConfiguration.INDEX_TYPE, indexType); + metadata.put(ConnectionConfiguration.HOSTS, getHosts()); + metadata.put(IndexConfiguration.INDEX_ALIAS, indexAlias); + metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath); + metadata.put(IndexConfiguration.FLUSH_TIMEOUT, -1); + metadata.put("insecure", true); + final String user = System.getProperty("tests.opensearch.user"); + final String password = System.getProperty("tests.opensearch.password"); + if (user != null) { + metadata.put(AUTHENTICATION, Map.of(USERNAME, user, PASSWORD, password)); + } + final String distributionVersion = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? + DistributionVersion.ES6.getVersion() : DistributionVersion.DEFAULT.getVersion(); + metadata.put(IndexConfiguration.DISTRIBUTION_VERSION, distributionVersion); + + final String json = new ObjectMapper().writeValueAsString(metadata); + return objectMapper.readValue(json, OpenSearchSinkConfig.class); + } + + private List> getSearchResponseDocSources(final String index) throws IOException { + final Request refresh = new Request(HttpMethod.POST, index + "/_refresh"); + client.performRequest(refresh); + final Request request = new Request(HttpMethod.GET, index + "/_search"); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); + + @SuppressWarnings("unchecked") + final List hits = (List) ((Map) createContentParser( + XContentType.JSON.xContent(), responseBody).map().get("hits")).get("hits"); + @SuppressWarnings("unchecked") + final List> sources = hits.stream() + .map(hit -> (Map) ((Map) hit).get("_source")) + .collect(Collectors.toList()); + return sources; + } + + private void wipeAllOpenSearchIndices() throws IOException { + final String getIndicesEndpoint = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( + OpenSearchIntegrationHelper.getVersion()) >= 0 ? + "/*?expand_wildcards=all&include_type_name=false" : "/*?expand_wildcards=all"; + final Response response = client.performRequest(new Request("GET", getIndicesEndpoint)); + final String responseBody = EntityUtils.toString(response.getEntity()); + final Map indexContent = createContentParser(XContentType.JSON.xContent(), responseBody).map(); + final Set indices = indexContent.keySet(); + + indices.stream() + .filter(Objects::nonNull) + .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro-"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".opendistro_"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch-"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".opensearch_"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".ql"))) + .filter(Predicate.not(indexName -> indexName.startsWith(".plugins-ml-config"))) + .forEach(indexName -> { + try { + client.performRequest(new Request("DELETE", "/" + indexName)); + } catch (final IOException e) { + throw new RuntimeException(e); + } + }); + } + + private static boolean isES6() { + return DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(OpenSearchIntegrationHelper.getVersion()) >= 0; + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 7fcae130e9..02fc903500 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -32,6 +32,7 @@ import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; @@ -72,6 +73,7 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.TSDBDocumentBuilder; import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ServerlessOptions; import org.slf4j.Logger; @@ -168,6 +170,8 @@ public class OpenSearchSink extends AbstractSink> { private ExistingDocumentQueryManager existingDocumentQueryManager; + private final TSDBDocumentBuilder tsdbDocumentBuilder; + private final ExecutorService queryExecutorService; private final int processWorkerThreads; @@ -218,6 +222,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.pluginConfigObservable = pluginConfigObservable; this.objectMapper = new ObjectMapper(); this.bulkOperationFactory = new BulkOperationFactory(versionType, scriptManager, objectMapper, isUsingDocumentFilters()); + this.tsdbDocumentBuilder = (this.indexType == IndexType.TSDB) ? new TSDBDocumentBuilder() : null; this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ? Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null; @@ -384,6 +389,32 @@ public void doOutput(final Collection> records) { } dataStreamIndex.ensureTimestamp(event, indexName); + + if (indexType == IndexType.TSDB) { + try { + final List tsdbDocs = tsdbDocumentBuilder.build(event); + final String tsdbAction = resolveEventAction(event); + final List wrappers = new ArrayList<>(tsdbDocs.size()); + for (int i = 0; i < tsdbDocs.size(); i++) { + final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDocs.get(i), null, null, null); + final BulkOperation op = getBulkOperationForAction(tsdbAction, doc, null, indexName, null); + final BulkOperationWrapper wrapper = (i == 0) + ? new BulkOperationWrapper(op, event.getEventHandle(), null, null) + : new BulkOperationWrapper(op, (EventHandle) null, null, null); + wrappers.add(wrapper); + } + for (final BulkOperationWrapper wrapper : wrappers) { + bulkRequest = flushBatch(bulkRequest, wrapper, lastFlushTime); + bulkRequest.addOperation(wrapper); + } + } catch (final Exception e) { + LOG.error("Failed to build TSDB documents for event: {}", e.getMessage(), e); + dynamicIndexDroppedEvents.increment(); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); + } + continue; + } + final SerializedJson document = getDocument(event); Long version = null; @@ -416,20 +447,7 @@ public void doOutput(final Collection> records) { } } - String eventAction = action; - if (actions != null) { - for (final ActionConfiguration actionEntry: actions) { - final String condition = actionEntry.getWhen(); - eventAction = actionEntry.getType(); - if (condition != null && - expressionEvaluator.evaluateConditional(condition, event)) { - break; - } - } - } - if (eventAction.contains("${")) { - eventAction = event.formatString(eventAction, expressionEvaluator); - } + String eventAction = resolveEventAction(event); if (dataStreamDetector.isDataStream(indexName)) { eventAction = dataStreamIndex.determineAction(eventAction, indexName); @@ -542,7 +560,7 @@ void successfulOperationsHandler(final List successfulOper if (bulkOperation.getEvent() != null) { bulkOperation.getEvent().getEventHandle().release(true); } else { - bulkOperation.getEventHandle().release(true); + bulkOperation.releaseEventHandle(true); } } return; @@ -699,6 +717,24 @@ private DlqObject createDlqObjectFromEvent(final Event event, return builder.build(); } + private String resolveEventAction(final Event event) { + String resolvedAction = action; + if (actions != null) { + for (final ActionConfiguration actionEntry : actions) { + final String condition = actionEntry.getWhen(); + resolvedAction = actionEntry.getType(); + if (condition != null && + expressionEvaluator.evaluateConditional(condition, event)) { + break; + } + } + } + if (resolvedAction.contains("${")) { + resolvedAction = event.formatString(resolvedAction, expressionEvaluator); + } + return resolvedAction; + } + /** * This function is used for update and upsert bulk actions to determine whether the original JsonNode needs to be filtered down * based on the user's sink configuration. If a new parameter manipulates the document before sending to OpenSearch, it needs to be added to diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 9bb73183b4..c55e317952 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -457,6 +457,8 @@ private Map readIndexTemplate(final String templateFile, final I templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_DEFAULT_TEMPLATE_FILE); } else if (indexType.equals(IndexType.METRIC_ANALYTICS_PLAIN)) { templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_STANDARD_TEMPLATE_FILE); + } else if (indexType.equals(IndexType.TSDB)) { + templateURL = loadExistingTemplate(templateType, IndexConstants.TSDB_DEFAULT_TEMPLATE_FILE); } else if (templateFile != null) { if (templateFile.toLowerCase().startsWith(S3_PREFIX)) { FileReader s3FileReader = new S3FileReader(s3Client); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java index 2df48191e7..08e2be0e4e 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConstants.java @@ -42,6 +42,8 @@ public class IndexConstants { public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_NO_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-no-ism-template.json"; public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_WITH_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-with-ism-template.json"; + public static final String TSDB_DEFAULT_TEMPLATE_FILE = "tsdb-index-template.json"; + static { // TODO: extract out version number into version enum TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_SERVICE_MAP, "otel-v1-apm-service-map"); @@ -52,5 +54,6 @@ public class IndexConstants { TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS_PLAIN, "logs-otel-v1"); TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS, "metrics-otel-v1"); TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS_PLAIN, "metrics-otel-v1"); + TYPE_TO_DEFAULT_ALIAS.put(IndexType.TSDB, "metrics-tsdb-v1"); } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java index 3781bd9b06..0acacfc458 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java @@ -56,6 +56,7 @@ public final IndexManager getIndexManager(final IndexType indexType, restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias); break; case TRACE_ANALYTICS_SERVICE_MAP: + case TSDB: indexManager = new TraceAnalyticsServiceMapIndexManager( restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias); break; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java index 3977dc398c..bca777d859 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexType.java @@ -20,6 +20,7 @@ public enum IndexType { LOG_ANALYTICS_PLAIN("log-analytics-plain"), METRIC_ANALYTICS("metric-analytics"), METRIC_ANALYTICS_PLAIN("metric-analytics-plain"), + TSDB("tsdb"), CUSTOM("custom"), MANAGEMENT_DISABLED("management_disabled"); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java new file mode 100644 index 0000000000..bbb9830fdd --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java @@ -0,0 +1,260 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.metric.Gauge; +import org.opensearch.dataprepper.model.metric.Histogram; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.metric.Quantile; +import org.opensearch.dataprepper.model.metric.Sum; +import org.opensearch.dataprepper.model.metric.Summary; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +public final class TSDBDocumentBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(TSDBDocumentBuilder.class); + private static final String NAME_LABEL = "__name__"; + private static final char[] HEX_CHARS = "0123456789abcdef".toCharArray(); + + public List build(final Event event) { + if (!(event instanceof Metric)) { + throw new IllegalArgumentException( + "TSDB index_type requires Metric events. Received: " + event.getClass().getName()); + } + + final Metric metric = (Metric) event; + final long timestamp = parseTimestamp(metric.getTime()); + final Map attributes = metric.getAttributes() != null + ? metric.getAttributes() : Collections.emptyMap(); + final String[][] sortedAttrs = sortAndSanitizeAttributes(attributes); + + if (metric instanceof Gauge) { + return buildGauge((Gauge) metric, sortedAttrs, timestamp); + } else if (metric instanceof Sum) { + return buildSum((Sum) metric, sortedAttrs, timestamp); + } else if (metric instanceof Histogram) { + return buildHistogram((Histogram) metric, sortedAttrs, timestamp); + } else if (metric instanceof Summary) { + return buildSummary((Summary) metric, sortedAttrs, timestamp); + } + + LOG.warn("Unsupported metric kind '{}' for metric '{}', building single document from value 0.0", + metric.getKind(), metric.getName()); + return List.of(buildJsonDoc(buildLabels(metric.getName(), sortedAttrs, null, null), timestamp, 0.0)); + } + + private static List buildGauge(final Gauge gauge, final String[][] sortedAttrs, final long timestamp) { + final double value = gauge.getValue() != null ? gauge.getValue() : 0.0; + final String labels = buildLabels(gauge.getName(), sortedAttrs, null, null); + return List.of(buildJsonDoc(labels, timestamp, value)); + } + + private static List buildSum(final Sum sum, final String[][] sortedAttrs, final long timestamp) { + final double value = sum.getValue() != null ? sum.getValue() : 0.0; + final String name = sum.isMonotonic() && !sum.getName().endsWith("_total") + ? sum.getName() + "_total" + : sum.getName(); + final String labels = buildLabels(name, sortedAttrs, null, null); + return List.of(buildJsonDoc(labels, timestamp, value)); + } + + private static List buildHistogram(final Histogram histogram, final String[][] sortedAttrs, final long timestamp) { + final String baseName = histogram.getName(); + final List bucketCounts = histogram.getBucketCountsList(); + final List explicitBounds = histogram.getExplicitBoundsList(); + final int bucketSize = (bucketCounts != null && explicitBounds != null) ? bucketCounts.size() : 0; + final List documents = new ArrayList<>(bucketSize + 2); + + if (bucketSize > 0) { + final String bucketName = baseName + "_bucket"; + long cumulativeCount = 0; + for (int i = 0; i < bucketCounts.size(); i++) { + cumulativeCount += bucketCounts.get(i); + final String le = i < explicitBounds.size() + ? formatDouble(explicitBounds.get(i)) + : "+Inf"; + final String labels = buildLabels(bucketName, sortedAttrs, "le", le); + documents.add(buildJsonDoc(labels, timestamp, (double) cumulativeCount)); + } + } + + appendCountAndSumDocuments(documents, baseName, histogram.getCount(), histogram.getSum(), sortedAttrs, timestamp); + return documents; + } + + private static List buildSummary(final Summary summary, final String[][] sortedAttrs, final long timestamp) { + final String baseName = summary.getName(); + final List quantiles = summary.getQuantiles(); + final int quantileSize = quantiles != null ? quantiles.size() : 0; + final List documents = new ArrayList<>(quantileSize + 2); + + if (quantiles != null) { + for (final Quantile q : quantiles) { + final String labels = buildLabels(baseName, sortedAttrs, "quantile", formatDouble(q.getQuantile())); + documents.add(buildJsonDoc(labels, timestamp, q.getValue() != null ? q.getValue() : 0.0)); + } + } + + appendCountAndSumDocuments(documents, baseName, summary.getCount(), summary.getSum(), sortedAttrs, timestamp); + return documents; + } + + private static void appendCountAndSumDocuments(final List documents, final String baseName, + final Long count, final Double sum, + final String[][] sortedAttrs, final long timestamp) { + documents.add(buildJsonDoc( + buildLabels(baseName + "_count", sortedAttrs, null, null), + timestamp, count != null ? count.doubleValue() : 0.0)); + documents.add(buildJsonDoc( + buildLabels(baseName + "_sum", sortedAttrs, null, null), + timestamp, sum != null ? sum : 0.0)); + } + + private static String[][] sortAndSanitizeAttributes(final Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return new String[0][]; + } + final String[][] entries = new String[attributes.size()][]; + int idx = 0; + for (final Map.Entry e : attributes.entrySet()) { + if (e.getValue() != null) { + entries[idx++] = new String[]{sanitizeLabelKey(e.getKey()), sanitizeLabelValue(e.getValue().toString())}; + } + } + final String[][] trimmed = idx == entries.length ? entries : Arrays.copyOf(entries, idx); + Arrays.sort(trimmed, Comparator.comparing(a -> a[0])); + return trimmed; + } + + private static String buildLabels(final String metricName, final String[][] sortedAttrs, + final String extraKey, final String extraValue) { + final int estimatedSize = 10 + metricName.length() + + sortedAttrs.length * 20 + + (extraKey != null ? extraKey.length() + extraValue.length() + 2 : 0); + final StringBuilder sb = new StringBuilder(estimatedSize); + + sb.append(NAME_LABEL).append(' ').append(sanitizeLabelValue(metricName)); + + boolean extraInserted = (extraKey == null); + for (final String[] attr : sortedAttrs) { + if (!extraInserted && attr[0].equals(extraKey)) { + continue; + } + if (!extraInserted && extraKey.compareTo(attr[0]) < 0) { + sb.append(' ').append(extraKey).append(' ').append(extraValue); + extraInserted = true; + } + sb.append(' ').append(attr[0]).append(' ').append(attr[1]); + } + if (!extraInserted) { + sb.append(' ').append(extraKey).append(' ').append(extraValue); + } + + return sb.toString(); + } + + private static String buildJsonDoc(final String labels, final long timestamp, final double value) { + final StringBuilder sb = new StringBuilder(labels.length() + 64); + sb.append("{\"labels\":\""); + appendJsonEscaped(sb, labels); + sb.append("\",\"timestamp\":"); + sb.append(timestamp); + sb.append(",\"value\":"); + sb.append(value); + sb.append('}'); + return sb.toString(); + } + + private static void appendJsonEscaped(final StringBuilder sb, final String s) { + for (int i = 0; i < s.length(); i++) { + final char c = s.charAt(i); + switch (c) { + case '"': + sb.append("\\\""); + break; + case '\\': + sb.append("\\\\"); + break; + case '\n': + sb.append("\\n"); + break; + case '\r': + sb.append("\\r"); + break; + case '\t': + sb.append("\\t"); + break; + default: + if (c < 0x20) { + sb.append("\\u00"); + sb.append(HEX_CHARS[(c >> 4) & 0xF]); + sb.append(HEX_CHARS[c & 0xF]); + } else { + sb.append(c); + } + } + } + } + + private static long parseTimestamp(final String isoTime) { + if (isoTime == null || isoTime.isEmpty()) { + LOG.warn("Metric has no timestamp, using current time"); + return System.currentTimeMillis(); + } + try { + return Instant.parse(isoTime).toEpochMilli(); + } catch (final Exception e) { + LOG.error("Failed to parse timestamp '{}', using current time", isoTime, e); + return System.currentTimeMillis(); + } + } + + private static String sanitizeLabelKey(final String key) { + if (key == null || key.isEmpty()) { + return "_"; + } + final StringBuilder sb = new StringBuilder(key.length() + 1); + final char first = key.charAt(0); + if (!(Character.isLetter(first) || first == '_')) { + sb.append('_'); + } + for (int i = 0; i < key.length(); i++) { + final char c = key.charAt(i); + sb.append(Character.isLetterOrDigit(c) || c == '_' ? c : '_'); + } + return sb.toString(); + } + + private static String sanitizeLabelValue(final String value) { + if (value == null) { + return ""; + } + return value.replace(' ', '_'); + } + + private static String formatDouble(final Double value) { + if (value == null) { + return "0"; + } + if (value == Double.POSITIVE_INFINITY) { + return "+Inf"; + } + if (value == (long) value.doubleValue()) { + return String.valueOf((long) value.doubleValue()); + } + return String.valueOf(value); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/resources/index-template/tsdb-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/index-template/tsdb-index-template.json new file mode 100644 index 0000000000..40ea634a3a --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/resources/index-template/tsdb-index-template.json @@ -0,0 +1,33 @@ +{ + "version": 1, + "template": { + "mappings": { + "properties": { + "series_ref": { + "type": "long", + "doc_values": false + }, + "labels": { + "type": "keyword", + "ignore_above": 4096 + }, + "value": { + "type": "double", + "doc_values": false + }, + "timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "timestamp_range": { + "type": "long_range" + } + } + }, + "settings": { + "index.translog.durability": "async", + "index.translog.sync_interval": "1s", + "refresh_interval": "1s" + } + } +} diff --git a/data-prepper-plugins/opensearch/src/main/resources/tsdb-index-template.json b/data-prepper-plugins/opensearch/src/main/resources/tsdb-index-template.json new file mode 100644 index 0000000000..bcab4e00b6 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/resources/tsdb-index-template.json @@ -0,0 +1,31 @@ +{ + "version": 1, + "mappings": { + "properties": { + "series_ref": { + "type": "long", + "doc_values": false + }, + "labels": { + "type": "keyword", + "ignore_above": 4096 + }, + "value": { + "type": "double", + "doc_values": false + }, + "timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "timestamp_range": { + "type": "long_range" + } + } + }, + "settings": { + "index.translog.durability": "async", + "index.translog.sync_interval": "1s", + "refresh_interval": "1s" + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 627ba92d58..505de4372f 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -243,20 +243,18 @@ void test_initialization_with_failure_and_retry_with_query_manager() throws IOEx @Test void test_sink_successful_records_handling_without_forwarding_pipelines_bulk_operations_with_event_handles() throws Exception { when(sinkContext.getForwardToPipelines()).thenReturn(Map.of()); - final EventHandle eventHandle = mock(EventHandle.class); final OpenSearchSink objectUnderTest = createObjectUnderTest(); BulkOperationWrapper op1 = mock(BulkOperationWrapper.class); BulkOperationWrapper op2 = mock(BulkOperationWrapper.class); when(op1.getEvent()).thenReturn(null); when(op2.getEvent()).thenReturn(null); - when(op1.getEventHandle()).thenReturn(eventHandle); - when(op2.getEventHandle()).thenReturn(eventHandle); List operationsList = new ArrayList<>(); operationsList.add(op1); operationsList.add(op2); objectUnderTest.successfulOperationsHandler(operationsList); - verify(eventHandle, times(2)).release(eq(true)); - + verify(op1).releaseEventHandle(eq(true)); + verify(op2).releaseEventHandle(eq(true)); + } @Test diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java index a1c0a7067f..c7b6b19505 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexTypeTests.java @@ -33,11 +33,12 @@ public void getByValue() { assertEquals(Optional.of(IndexType.LOG_ANALYTICS_PLAIN), IndexType.getByValue("log-analytics-plain")); assertEquals(Optional.of(IndexType.METRIC_ANALYTICS), IndexType.getByValue("metric-analytics")); assertEquals(Optional.of(IndexType.METRIC_ANALYTICS_PLAIN), IndexType.getByValue("metric-analytics-plain")); + assertEquals(Optional.of(IndexType.TSDB), IndexType.getByValue("tsdb")); } @Test public void getIndexTypeValues() { - assertEquals("[trace-analytics-raw, trace-analytics-plain-raw, trace-analytics-service-map, otel-v2-apm-service-map, log-analytics, log-analytics-plain, metric-analytics, metric-analytics-plain, custom, management_disabled]", IndexType.getIndexTypeValues()); + assertEquals("[trace-analytics-raw, trace-analytics-plain-raw, trace-analytics-service-map, otel-v2-apm-service-map, log-analytics, log-analytics-plain, metric-analytics, metric-analytics-plain, tsdb, custom, management_disabled]", IndexType.getIndexTypeValues()); } @ParameterizedTest diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java new file mode 100644 index 0000000000..05686c3134 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java @@ -0,0 +1,417 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.metric.DefaultQuantile; +import org.opensearch.dataprepper.model.metric.JacksonGauge; +import org.opensearch.dataprepper.model.metric.JacksonHistogram; +import org.opensearch.dataprepper.model.metric.JacksonSum; +import org.opensearch.dataprepper.model.metric.JacksonSummary; +import org.opensearch.dataprepper.model.metric.Quantile; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TSDBDocumentBuilderTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String TEST_TIME = "2024-02-02T10:30:00Z"; + private static final long TEST_TIMESTAMP_MILLIS = 1706869800000L; + + private TSDBDocumentBuilder builder; + + @BeforeEach + void setUp() { + builder = new TSDBDocumentBuilder(); + } + + // --- Gauge Tests --- + + @Test + void build_gauge_returns_single_document() throws Exception { + final Map attributes = Map.of("host", "server-01"); + final JacksonGauge gauge = JacksonGauge.builder() + .withName("cpu_temp") + .withValue(72.5) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.build(gauge); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + assertEquals("__name__ cpu_temp host server-01", doc.get("labels")); + assertEquals(TEST_TIMESTAMP_MILLIS, ((Number) doc.get("timestamp")).longValue()); + assertEquals(72.5, ((Number) doc.get("value")).doubleValue(), 0.001); + } + + @Test + void build_gauge_with_no_attributes() throws Exception { + final JacksonGauge gauge = JacksonGauge.builder() + .withName("memory_usage") + .withValue(85.0) + .withTime(TEST_TIME) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.build(gauge); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + assertEquals("__name__ memory_usage", doc.get("labels")); + assertEquals(85.0, ((Number) doc.get("value")).doubleValue(), 0.001); + } + + // --- Sum Tests --- + + @Test + void build_monotonic_sum_adds_total_suffix() throws Exception { + final Map attributes = Map.of("method", "GET"); + final JacksonSum sum = JacksonSum.builder() + .withName("http_requests") + .withValue(100.0) + .withIsMonotonic(true) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("SUM") + .build(); + + final List docs = builder.build(sum); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + assertEquals("__name__ http_requests_total method GET", doc.get("labels")); + assertEquals(100.0, ((Number) doc.get("value")).doubleValue(), 0.001); + } + + @Test + void build_non_monotonic_sum_no_total_suffix() throws Exception { + final JacksonSum sum = JacksonSum.builder() + .withName("temperature") + .withValue(22.5) + .withIsMonotonic(false) + .withTime(TEST_TIME) + .withEventKind("SUM") + .build(); + + final List docs = builder.build(sum); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + assertEquals("__name__ temperature", doc.get("labels")); + } + + @Test + void build_monotonic_sum_already_has_total_suffix() throws Exception { + final JacksonSum sum = JacksonSum.builder() + .withName("http_requests_total") + .withValue(100.0) + .withIsMonotonic(true) + .withTime(TEST_TIME) + .withEventKind("SUM") + .build(); + + final List docs = builder.build(sum); + + assertEquals(1, docs.size()); + final Map doc = parseJson(docs.get(0)); + // Should NOT double-append _total + assertEquals("__name__ http_requests_total", doc.get("labels")); + } + + // --- Histogram Tests --- + + @Test + void build_histogram_returns_bucket_plus_count_plus_sum_documents() throws Exception { + final Map attributes = Map.of("method", "GET"); + final JacksonHistogram histogram = JacksonHistogram.builder() + .withName("request_duration") + .withSum(5.5) + .withCount(20L) + .withBucketCountsList(List.of(5L, 5L, 5L, 5L)) + .withExplicitBoundsList(List.of(0.1, 0.5, 1.0)) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("HISTOGRAM") + .build(); + + final List docs = builder.build(histogram); + + // 4 bucket docs + 1 _count + 1 _sum = 6 + assertEquals(6, docs.size()); + + // Bucket 1: cumulative = 5 + final Map bucket1 = parseJson(docs.get(0)); + assertEquals("__name__ request_duration_bucket le 0.1 method GET", bucket1.get("labels")); + assertEquals(5.0, ((Number) bucket1.get("value")).doubleValue(), 0.001); + + // Bucket 2: cumulative = 10 + final Map bucket2 = parseJson(docs.get(1)); + assertEquals("__name__ request_duration_bucket le 0.5 method GET", bucket2.get("labels")); + assertEquals(10.0, ((Number) bucket2.get("value")).doubleValue(), 0.001); + + // Bucket 3: cumulative = 15 + final Map bucket3 = parseJson(docs.get(2)); + assertEquals("__name__ request_duration_bucket le 1 method GET", bucket3.get("labels")); + assertEquals(15.0, ((Number) bucket3.get("value")).doubleValue(), 0.001); + + // Bucket 4 (+Inf): cumulative = 20 + final Map bucket4 = parseJson(docs.get(3)); + assertEquals("__name__ request_duration_bucket le +Inf method GET", bucket4.get("labels")); + assertEquals(20.0, ((Number) bucket4.get("value")).doubleValue(), 0.001); + + // _count document + final Map countDoc = parseJson(docs.get(4)); + assertEquals("__name__ request_duration_count method GET", countDoc.get("labels")); + assertEquals(20.0, ((Number) countDoc.get("value")).doubleValue(), 0.001); + + // _sum document + final Map sumDoc = parseJson(docs.get(5)); + assertEquals("__name__ request_duration_sum method GET", sumDoc.get("labels")); + assertEquals(5.5, ((Number) sumDoc.get("value")).doubleValue(), 0.001); + } + + @Test + void build_histogram_all_documents_have_same_timestamp() throws Exception { + final JacksonHistogram histogram = JacksonHistogram.builder() + .withName("latency") + .withSum(1.0) + .withCount(10L) + .withBucketCountsList(List.of(5L, 5L)) + .withExplicitBoundsList(List.of(0.5)) + .withTime(TEST_TIME) + .withEventKind("HISTOGRAM") + .build(); + + final List docs = builder.build(histogram); + + for (final String jsonDoc : docs) { + final Map doc = parseJson(jsonDoc); + assertEquals(TEST_TIMESTAMP_MILLIS, ((Number) doc.get("timestamp")).longValue()); + } + } + + // --- Summary Tests --- + + @Test + void build_summary_returns_quantile_plus_count_plus_sum_documents() throws Exception { + final Map attributes = Map.of("service", "api"); + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, 0.2), + new DefaultQuantile(0.99, 0.8) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("rpc_latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(1000L) + .withSum(300.5) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.build(summary); + + // 2 quantile docs + 1 _count + 1 _sum = 4 + assertEquals(4, docs.size()); + + // Quantile 0.5 + final Map q1 = parseJson(docs.get(0)); + assertEquals("__name__ rpc_latency quantile 0.5 service api", q1.get("labels")); + assertEquals(0.2, ((Number) q1.get("value")).doubleValue(), 0.001); + + // Quantile 0.99 + final Map q2 = parseJson(docs.get(1)); + assertEquals("__name__ rpc_latency quantile 0.99 service api", q2.get("labels")); + assertEquals(0.8, ((Number) q2.get("value")).doubleValue(), 0.001); + + // _count + final Map countDoc = parseJson(docs.get(2)); + assertEquals("__name__ rpc_latency_count service api", countDoc.get("labels")); + assertEquals(1000.0, ((Number) countDoc.get("value")).doubleValue(), 0.001); + + // _sum + final Map sumDoc = parseJson(docs.get(3)); + assertEquals("__name__ rpc_latency_sum service api", sumDoc.get("labels")); + assertEquals(300.5, ((Number) sumDoc.get("value")).doubleValue(), 0.001); + } + + // --- Label Sorting Tests --- + + @Test + void labels_are_sorted_lexicographically() throws Exception { + final Map attributes = new HashMap<>(); + attributes.put("zone", "us-east"); + attributes.put("app", "myservice"); + attributes.put("host", "server-01"); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("cpu_temp") + .withValue(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.build(gauge); + final Map doc = parseJson(docs.get(0)); + + // Sorted: __name__, app, host, zone + assertEquals("__name__ cpu_temp app myservice host server-01 zone us-east", doc.get("labels")); + } + + // --- Label Sanitization Tests --- + + @Test + void label_values_with_spaces_are_sanitized() throws Exception { + final Map attributes = Map.of("handler", "/api/items with spaces"); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("requests") + .withValue(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.build(gauge); + final Map doc = parseJson(docs.get(0)); + + assertTrue(((String) doc.get("labels")).contains("handler /api/items_with_spaces")); + } + + @Test + void label_keys_with_invalid_chars_are_sanitized() throws Exception { + // key "123" should become "_123" (digit-first gets underscore prepended) + final Map attributes = Map.of("123", "val"); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("test") + .withValue(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.build(gauge); + final Map doc = parseJson(docs.get(0)); + + assertTrue(((String) doc.get("labels")).contains("_123 val")); + } + + @Test + void label_keys_with_dashes_are_sanitized() throws Exception { + final Map attributes = Map.of("key-with-dash", "val"); + + final JacksonGauge gauge = JacksonGauge.builder() + .withName("test") + .withValue(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("GAUGE") + .build(); + + final List docs = builder.build(gauge); + final Map doc = parseJson(docs.get(0)); + + assertTrue(((String) doc.get("labels")).contains("key_with_dash val")); + } + + @Test + void extra_label_is_merge_inserted_at_correct_sorted_position() throws Exception { + // "le" sorts between __name__ and "method" (l < m) + final Map attributes = new HashMap<>(); + attributes.put("method", "GET"); + attributes.put("region", "us-east"); + + final JacksonHistogram histogram = JacksonHistogram.builder() + .withName("duration") + .withSum(1.0) + .withCount(10L) + .withBucketCountsList(List.of(10L)) + .withExplicitBoundsList(List.of(0.5)) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("HISTOGRAM") + .build(); + + final List docs = builder.build(histogram); + final Map bucketDoc = parseJson(docs.get(0)); + + assertEquals("__name__ duration_bucket le 0.5 method GET region us-east", bucketDoc.get("labels")); + } + + @Test + void extra_label_sorting_after_all_attributes() throws Exception { + // "quantile" sorts after "app" (q > a) + final Map attributes = Map.of("app", "web"); + final List quantiles = Arrays.asList(new DefaultQuantile(0.99, 1.0)); + + final JacksonSummary summary = JacksonSummary.builder() + .withName("latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(1) + .withCount(1L) + .withSum(1.0) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.build(summary); + final Map quantileDoc = parseJson(docs.get(0)); + + assertEquals("__name__ latency app web quantile 0.99", quantileDoc.get("labels")); + } + + // --- Timestamp Tests --- + + @Test + void build_with_valid_timestamp() throws Exception { + final JacksonGauge gauge = JacksonGauge.builder() + .withName("test") + .withValue(1.0) + .withTime("2024-02-02T10:30:00Z") + .withEventKind("GAUGE") + .build(); + + final List docs = builder.build(gauge); + final Map doc = parseJson(docs.get(0)); + + assertEquals(TEST_TIMESTAMP_MILLIS, ((Number) doc.get("timestamp")).longValue()); + } + + // --- Non-Metric Event Test --- + + @Test + void build_throws_for_non_metric_event() { + final Event event = JacksonEvent.builder() + .withEventType("LOG") + .withData(Map.of("message", "hello")) + .build(); + + assertThrows(IllegalArgumentException.class, () -> builder.build(event)); + } + + private Map parseJson(final String json) throws Exception { + return OBJECT_MAPPER.readValue(json, new TypeReference>() {}); + } +} From d8c51ca25cc682a51b60289772c6b96c83cab119 Mon Sep 17 00:00:00 2001 From: Srikanth Padakanti Date: Tue, 31 Mar 2026 16:36:05 -0500 Subject: [PATCH 2/5] Fix NEGATIVE_INFINITY formatting, null password guard, and EventHandle placement Signed-off-by: Srikanth Padakanti --- .../plugins/sink/opensearch/OpenSearchSinkTsdbIT.java | 2 +- .../dataprepper/plugins/sink/opensearch/OpenSearchSink.java | 2 +- .../plugins/sink/opensearch/index/TSDBDocumentBuilder.java | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java index 2603d95657..6d1a49987f 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java @@ -467,7 +467,7 @@ private OpenSearchSinkConfig generateOpenSearchSinkConfig(final String indexType metadata.put("insecure", true); final String user = System.getProperty("tests.opensearch.user"); final String password = System.getProperty("tests.opensearch.password"); - if (user != null) { + if (user != null && password != null) { metadata.put(AUTHENTICATION, Map.of(USERNAME, user, PASSWORD, password)); } final String distributionVersion = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 02fc903500..1c31c4414a 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -398,7 +398,7 @@ public void doOutput(final Collection> records) { for (int i = 0; i < tsdbDocs.size(); i++) { final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDocs.get(i), null, null, null); final BulkOperation op = getBulkOperationForAction(tsdbAction, doc, null, indexName, null); - final BulkOperationWrapper wrapper = (i == 0) + final BulkOperationWrapper wrapper = (i == tsdbDocs.size() - 1) ? new BulkOperationWrapper(op, event.getEventHandle(), null, null) : new BulkOperationWrapper(op, (EventHandle) null, null, null); wrappers.add(wrapper); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java index bbb9830fdd..f80b689094 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java @@ -252,6 +252,9 @@ private static String formatDouble(final Double value) { if (value == Double.POSITIVE_INFINITY) { return "+Inf"; } + if (value == Double.NEGATIVE_INFINITY) { + return "-Inf"; + } if (value == (long) value.doubleValue()) { return String.valueOf((long) value.doubleValue()); } From ac1a537947aca3011bd156ce1ff02ef7296114ce Mon Sep 17 00:00:00 2001 From: Srikanth Padakanti Date: Wed, 1 Apr 2026 11:25:48 -0500 Subject: [PATCH 3/5] Add full license headers to new TSDB files Signed-off-by: Srikanth Padakanti --- .../plugins/sink/opensearch/OpenSearchSinkTsdbIT.java | 4 ++++ .../plugins/sink/opensearch/index/TSDBDocumentBuilder.java | 4 ++++ .../sink/opensearch/index/TSDBDocumentBuilderTest.java | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java index 6d1a49987f..452f7e4669 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugins.sink.opensearch; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java index f80b689094..399fb491df 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugins.sink.opensearch.index; diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java index 05686c3134..75aa01654b 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java @@ -1,6 +1,10 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. */ package org.opensearch.dataprepper.plugins.sink.opensearch.index; From 15c537d95e740982bb4fc85ca91b992db61f6979 Mon Sep 17 00:00:00 2001 From: Srikanth Padakanti Date: Thu, 9 Apr 2026 11:02:32 -0500 Subject: [PATCH 4/5] Address code review feedback for TSDB support Signed-off-by: Srikanth Padakanti --- .../sink/opensearch/OpenSearchSinkTsdbIT.java | 26 -------- .../sink/opensearch/OpenSearchSink.java | 30 +++++---- .../index/CustomDocumentBuilder.java | 23 +++++++ .../index/CustomDocumentBuilderFactory.java | 20 ++++++ .../opensearch/index/IndexManagerFactory.java | 1 - .../opensearch/index/TSDBDocumentBuilder.java | 57 +++++----------- .../index/TSDBDocumentBuilderTest.java | 65 +++++-------------- 7 files changed, 90 insertions(+), 132 deletions(-) create mode 100644 data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilder.java create mode 100644 data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilderFactory.java diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java index 452f7e4669..9b6a3ba01a 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTsdbIT.java @@ -153,7 +153,6 @@ void closeClient() throws IOException { } } - // --- Index Initialization --- @Test @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") @@ -169,7 +168,6 @@ void testInstantiateSinkTsdbDefault() throws IOException { final Response response = client.performRequest(request); assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); - // Query mappings via alias (TSDB uses NoIsmPolicyManagement so index name may not follow -000001 pattern) final String extraURI = DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo( OpenSearchIntegrationHelper.getVersion()) >= 0 ? INCLUDE_TYPE_NAME_FALSE_URI : ""; final Request mappingRequest = new Request(HttpMethod.GET, indexAlias + "/_mappings" + extraURI); @@ -182,7 +180,6 @@ void testInstantiateSinkTsdbDefault() throws IOException { final Map mappings = (Map) ((Map) mappingResult.get(actualIndex)).get("mappings"); assertThat(mappings, notNullValue()); - // Verify TSDB-specific mapping fields @SuppressWarnings("unchecked") final Map properties = (Map) mappings.get("properties"); assertThat(properties, notNullValue()); @@ -190,11 +187,8 @@ void testInstantiateSinkTsdbDefault() throws IOException { assertThat(properties.containsKey("timestamp"), equalTo(true)); assertThat(properties.containsKey("value"), equalTo(true)); - // TSDB uses NoIsmPolicyManagement — no ISM policy should be attached - // (unlike metric-analytics which has ISM) } - // --- Gauge Output --- @Test @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") @@ -222,7 +216,6 @@ void testOutputGauge() throws IOException, InterruptedException { assertThat(((Number) doc.get("value")).doubleValue(), closeTo(72.5, 0.001)); assertThat(doc.get("timestamp"), notNullValue()); - // Verify metrics final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); @@ -230,7 +223,6 @@ void testOutputGauge() throws IOException, InterruptedException { Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); } - // --- Sum (Counter) Output --- @Test @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") @@ -255,11 +247,9 @@ void testOutputMonotonicSum() throws IOException, InterruptedException { assertThat(sources, hasSize(1)); final Map doc = sources.get(0); - // Monotonic sum should have _total suffix assertThat(doc.get("labels"), equalTo("__name__ http_requests_total method GET")); assertThat(((Number) doc.get("value")).doubleValue(), closeTo(100.0, 0.001)); - // Verify metrics final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); @@ -267,7 +257,6 @@ void testOutputMonotonicSum() throws IOException, InterruptedException { Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); } - // --- Histogram Expansion --- @Test @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") @@ -291,20 +280,16 @@ void testOutputHistogramExpansion() throws IOException, InterruptedException { final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); final List> sources = getSearchResponseDocSources(indexAlias); - // 4 bucket docs + 1 _count + 1 _sum = 6 assertThat(sources, hasSize(6)); - // Collect labels for verification final List labels = sources.stream() .map(s -> (String) s.get("labels")) .collect(Collectors.toList()); - // Verify bucket docs exist with cumulative counts assertThat(labels.stream().filter(l -> l.contains("request_duration_bucket")).count(), equalTo(4L)); assertThat(labels.stream().filter(l -> l.contains("request_duration_count")).count(), equalTo(1L)); assertThat(labels.stream().filter(l -> l.contains("request_duration_sum")).count(), equalTo(1L)); - // Verify cumulative bucket values final Map labelToValue = sources.stream() .collect(Collectors.toMap(s -> (String) s.get("labels"), s -> ((Number) s.get("value")).doubleValue())); @@ -315,14 +300,12 @@ void testOutputHistogramExpansion() throws IOException, InterruptedException { assertThat(labelToValue.get("__name__ request_duration_count method GET"), closeTo(20.0, 0.001)); assertThat(labelToValue.get("__name__ request_duration_sum method GET"), closeTo(5.5, 0.001)); - // Verify document success count matches expanded document count final List documentsSuccess = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(BulkRetryStrategy.DOCUMENTS_SUCCESS).toString()); assertThat(documentsSuccess.size(), equalTo(1)); assertThat(documentsSuccess.get(0).getValue(), closeTo(6.0, 0)); - // Verify no errors final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); @@ -330,7 +313,6 @@ void testOutputHistogramExpansion() throws IOException, InterruptedException { Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); } - // --- Summary Expansion --- @Test @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") @@ -358,7 +340,6 @@ void testOutputSummaryExpansion() throws IOException, InterruptedException { final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); final List> sources = getSearchResponseDocSources(indexAlias); - // 2 quantile docs + 1 _count + 1 _sum = 4 assertThat(sources, hasSize(4)); final Map labelToValue = sources.stream() @@ -369,7 +350,6 @@ void testOutputSummaryExpansion() throws IOException, InterruptedException { assertThat(labelToValue.get("__name__ rpc_latency_count service api"), closeTo(1000.0, 0.001)); assertThat(labelToValue.get("__name__ rpc_latency_sum service api"), closeTo(300.5, 0.001)); - // Verify metrics final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); @@ -377,7 +357,6 @@ void testOutputSummaryExpansion() throws IOException, InterruptedException { Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); } - // --- Multiple Metrics in Single Batch --- @Test @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") @@ -408,10 +387,8 @@ void testOutputMixedMetricTypes() throws IOException, InterruptedException { final String indexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TSDB); final List> sources = getSearchResponseDocSources(indexAlias); - // 1 gauge + 1 sum = 2 documents assertThat(sources, hasSize(2)); - // Verify metrics final List bulkRequestErrors = MetricsTestUtil.getMeasurementList( new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) .add(OpenSearchSink.BULKREQUEST_ERRORS).toString()); @@ -419,7 +396,6 @@ void testOutputMixedMetricTypes() throws IOException, InterruptedException { Assert.assertEquals(0.0, bulkRequestErrors.get(0).getValue(), 0); } - // --- Re-instantiation (no duplicate index) --- @Test @DisabledIf(value = "isES6", disabledReason = "TSDB is not supported for ES 6") @@ -433,7 +409,6 @@ void testReinstantiateSinkDoesNotCreateDuplicateIndex() throws IOException { Response response = client.performRequest(request); assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); - // Reinstantiate sink — should not fail createObjectUnderTest(config, true); request = new Request(HttpMethod.HEAD, indexAlias); @@ -441,7 +416,6 @@ void testReinstantiateSinkDoesNotCreateDuplicateIndex() throws IOException { assertThat(response.getStatusLine().getStatusCode(), equalTo(SC_OK)); } - // --- Helper methods --- private OpenSearchSink createObjectUnderTest(final OpenSearchSinkConfig openSearchSinkConfig, final boolean doInitialize) { final SinkContext sinkContext = mock(SinkContext.class); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 1c31c4414a..37671aa3b4 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -33,6 +33,7 @@ import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException; import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; @@ -73,7 +74,8 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; -import org.opensearch.dataprepper.plugins.sink.opensearch.index.TSDBDocumentBuilder; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.CustomDocumentBuilder; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.CustomDocumentBuilderFactory; import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ServerlessOptions; import org.slf4j.Logger; @@ -170,7 +172,7 @@ public class OpenSearchSink extends AbstractSink> { private ExistingDocumentQueryManager existingDocumentQueryManager; - private final TSDBDocumentBuilder tsdbDocumentBuilder; + private final CustomDocumentBuilder customDocumentBuilder; private final ExecutorService queryExecutorService; @@ -222,7 +224,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.pluginConfigObservable = pluginConfigObservable; this.objectMapper = new ObjectMapper(); this.bulkOperationFactory = new BulkOperationFactory(versionType, scriptManager, objectMapper, isUsingDocumentFilters()); - this.tsdbDocumentBuilder = (this.indexType == IndexType.TSDB) ? new TSDBDocumentBuilder() : null; + this.customDocumentBuilder = new CustomDocumentBuilderFactory().create(this.indexType); this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ? Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null; @@ -390,20 +392,20 @@ public void doOutput(final Collection> records) { dataStreamIndex.ensureTimestamp(event, indexName); - if (indexType == IndexType.TSDB) { + if (customDocumentBuilder != null) { try { - final List tsdbDocs = tsdbDocumentBuilder.build(event); + final List tsdbDocs = customDocumentBuilder.buildDocuments(event); final String tsdbAction = resolveEventAction(event); - final List wrappers = new ArrayList<>(tsdbDocs.size()); - for (int i = 0; i < tsdbDocs.size(); i++) { - final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDocs.get(i), null, null, null); - final BulkOperation op = getBulkOperationForAction(tsdbAction, doc, null, indexName, null); - final BulkOperationWrapper wrapper = (i == tsdbDocs.size() - 1) - ? new BulkOperationWrapper(op, event.getEventHandle(), null, null) - : new BulkOperationWrapper(op, (EventHandle) null, null, null); - wrappers.add(wrapper); + final EventHandle eventHandle = event.getEventHandle(); + if (tsdbDocs.size() > 1 && eventHandle instanceof InternalEventHandle) { + for (int i = 0; i < tsdbDocs.size() - 1; i++) { + ((InternalEventHandle) eventHandle).acquireReference(); + } } - for (final BulkOperationWrapper wrapper : wrappers) { + for (final String tsdbDoc : tsdbDocs) { + final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDoc, null, null, null); + final BulkOperation op = bulkOperationFactory.create(tsdbAction, doc, null, indexName, null); + final BulkOperationWrapper wrapper = new BulkOperationWrapper(op, eventHandle, null, null); bulkRequest = flushBatch(bulkRequest, wrapper, lastFlushTime); bulkRequest.addOperation(wrapper); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilder.java new file mode 100644 index 0000000000..20f268bab4 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilder.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +import org.opensearch.dataprepper.model.event.Event; + +import java.util.List; + +/** + * Converts a Data Prepper {@link Event} into one or more JSON document strings + * suitable for indexing into OpenSearch. Implementations handle index-type-specific + * document transformations such as TSDB metric expansion. + */ +public interface CustomDocumentBuilder { + List buildDocuments(Event event); +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilderFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilderFactory.java new file mode 100644 index 0000000000..edf85a58c0 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/CustomDocumentBuilderFactory.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.index; + +public final class CustomDocumentBuilderFactory { + + public CustomDocumentBuilder create(final IndexType indexType) { + if (indexType == IndexType.TSDB) { + return new TSDBDocumentBuilder(); + } + return null; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java index 0acacfc458..3781bd9b06 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexManagerFactory.java @@ -56,7 +56,6 @@ public final IndexManager getIndexManager(final IndexType indexType, restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias); break; case TRACE_ANALYTICS_SERVICE_MAP: - case TSDB: indexManager = new TraceAnalyticsServiceMapIndexManager( restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias); break; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java index 399fb491df..236d9bcb75 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java @@ -9,6 +9,9 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.index; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.metric.Gauge; import org.opensearch.dataprepper.model.metric.Histogram; @@ -27,13 +30,14 @@ import java.util.List; import java.util.Map; -public final class TSDBDocumentBuilder { +public final class TSDBDocumentBuilder implements CustomDocumentBuilder { private static final Logger LOG = LoggerFactory.getLogger(TSDBDocumentBuilder.class); private static final String NAME_LABEL = "__name__"; - private static final char[] HEX_CHARS = "0123456789abcdef".toCharArray(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public List build(final Event event) { + @Override + public List buildDocuments(final Event event) { if (!(event instanceof Metric)) { throw new IllegalArgumentException( "TSDB index_type requires Metric events. Received: " + event.getClass().getName()); @@ -171,45 +175,14 @@ private static String buildLabels(final String metricName, final String[][] sort } private static String buildJsonDoc(final String labels, final long timestamp, final double value) { - final StringBuilder sb = new StringBuilder(labels.length() + 64); - sb.append("{\"labels\":\""); - appendJsonEscaped(sb, labels); - sb.append("\",\"timestamp\":"); - sb.append(timestamp); - sb.append(",\"value\":"); - sb.append(value); - sb.append('}'); - return sb.toString(); - } - - private static void appendJsonEscaped(final StringBuilder sb, final String s) { - for (int i = 0; i < s.length(); i++) { - final char c = s.charAt(i); - switch (c) { - case '"': - sb.append("\\\""); - break; - case '\\': - sb.append("\\\\"); - break; - case '\n': - sb.append("\\n"); - break; - case '\r': - sb.append("\\r"); - break; - case '\t': - sb.append("\\t"); - break; - default: - if (c < 0x20) { - sb.append("\\u00"); - sb.append(HEX_CHARS[(c >> 4) & 0xF]); - sb.append(HEX_CHARS[c & 0xF]); - } else { - sb.append(c); - } - } + final ObjectNode node = OBJECT_MAPPER.createObjectNode(); + node.put("labels", labels); + node.put("timestamp", timestamp); + node.put("value", value); + try { + return OBJECT_MAPPER.writeValueAsString(node); + } catch (final JsonProcessingException e) { + throw new RuntimeException("Failed to serialize TSDB document", e); } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java index 75aa01654b..30a1fa6444 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java @@ -44,8 +44,6 @@ void setUp() { builder = new TSDBDocumentBuilder(); } - // --- Gauge Tests --- - @Test void build_gauge_returns_single_document() throws Exception { final Map attributes = Map.of("host", "server-01"); @@ -57,7 +55,7 @@ void build_gauge_returns_single_document() throws Exception { .withEventKind("GAUGE") .build(); - final List docs = builder.build(gauge); + final List docs = builder.buildDocuments(gauge); assertEquals(1, docs.size()); final Map doc = parseJson(docs.get(0)); @@ -75,7 +73,7 @@ void build_gauge_with_no_attributes() throws Exception { .withEventKind("GAUGE") .build(); - final List docs = builder.build(gauge); + final List docs = builder.buildDocuments(gauge); assertEquals(1, docs.size()); final Map doc = parseJson(docs.get(0)); @@ -83,8 +81,6 @@ void build_gauge_with_no_attributes() throws Exception { assertEquals(85.0, ((Number) doc.get("value")).doubleValue(), 0.001); } - // --- Sum Tests --- - @Test void build_monotonic_sum_adds_total_suffix() throws Exception { final Map attributes = Map.of("method", "GET"); @@ -97,7 +93,7 @@ void build_monotonic_sum_adds_total_suffix() throws Exception { .withEventKind("SUM") .build(); - final List docs = builder.build(sum); + final List docs = builder.buildDocuments(sum); assertEquals(1, docs.size()); final Map doc = parseJson(docs.get(0)); @@ -115,7 +111,7 @@ void build_non_monotonic_sum_no_total_suffix() throws Exception { .withEventKind("SUM") .build(); - final List docs = builder.build(sum); + final List docs = builder.buildDocuments(sum); assertEquals(1, docs.size()); final Map doc = parseJson(docs.get(0)); @@ -132,16 +128,13 @@ void build_monotonic_sum_already_has_total_suffix() throws Exception { .withEventKind("SUM") .build(); - final List docs = builder.build(sum); + final List docs = builder.buildDocuments(sum); assertEquals(1, docs.size()); final Map doc = parseJson(docs.get(0)); - // Should NOT double-append _total assertEquals("__name__ http_requests_total", doc.get("labels")); } - // --- Histogram Tests --- - @Test void build_histogram_returns_bucket_plus_count_plus_sum_documents() throws Exception { final Map attributes = Map.of("method", "GET"); @@ -156,37 +149,30 @@ void build_histogram_returns_bucket_plus_count_plus_sum_documents() throws Excep .withEventKind("HISTOGRAM") .build(); - final List docs = builder.build(histogram); + final List docs = builder.buildDocuments(histogram); - // 4 bucket docs + 1 _count + 1 _sum = 6 assertEquals(6, docs.size()); - // Bucket 1: cumulative = 5 final Map bucket1 = parseJson(docs.get(0)); assertEquals("__name__ request_duration_bucket le 0.1 method GET", bucket1.get("labels")); assertEquals(5.0, ((Number) bucket1.get("value")).doubleValue(), 0.001); - // Bucket 2: cumulative = 10 final Map bucket2 = parseJson(docs.get(1)); assertEquals("__name__ request_duration_bucket le 0.5 method GET", bucket2.get("labels")); assertEquals(10.0, ((Number) bucket2.get("value")).doubleValue(), 0.001); - // Bucket 3: cumulative = 15 final Map bucket3 = parseJson(docs.get(2)); assertEquals("__name__ request_duration_bucket le 1 method GET", bucket3.get("labels")); assertEquals(15.0, ((Number) bucket3.get("value")).doubleValue(), 0.001); - // Bucket 4 (+Inf): cumulative = 20 final Map bucket4 = parseJson(docs.get(3)); assertEquals("__name__ request_duration_bucket le +Inf method GET", bucket4.get("labels")); assertEquals(20.0, ((Number) bucket4.get("value")).doubleValue(), 0.001); - // _count document final Map countDoc = parseJson(docs.get(4)); assertEquals("__name__ request_duration_count method GET", countDoc.get("labels")); assertEquals(20.0, ((Number) countDoc.get("value")).doubleValue(), 0.001); - // _sum document final Map sumDoc = parseJson(docs.get(5)); assertEquals("__name__ request_duration_sum method GET", sumDoc.get("labels")); assertEquals(5.5, ((Number) sumDoc.get("value")).doubleValue(), 0.001); @@ -204,7 +190,7 @@ void build_histogram_all_documents_have_same_timestamp() throws Exception { .withEventKind("HISTOGRAM") .build(); - final List docs = builder.build(histogram); + final List docs = builder.buildDocuments(histogram); for (final String jsonDoc : docs) { final Map doc = parseJson(jsonDoc); @@ -212,8 +198,6 @@ void build_histogram_all_documents_have_same_timestamp() throws Exception { } } - // --- Summary Tests --- - @Test void build_summary_returns_quantile_plus_count_plus_sum_documents() throws Exception { final Map attributes = Map.of("service", "api"); @@ -232,34 +216,27 @@ void build_summary_returns_quantile_plus_count_plus_sum_documents() throws Excep .withEventKind("SUMMARY") .build(); - final List docs = builder.build(summary); + final List docs = builder.buildDocuments(summary); - // 2 quantile docs + 1 _count + 1 _sum = 4 assertEquals(4, docs.size()); - // Quantile 0.5 final Map q1 = parseJson(docs.get(0)); assertEquals("__name__ rpc_latency quantile 0.5 service api", q1.get("labels")); assertEquals(0.2, ((Number) q1.get("value")).doubleValue(), 0.001); - // Quantile 0.99 final Map q2 = parseJson(docs.get(1)); assertEquals("__name__ rpc_latency quantile 0.99 service api", q2.get("labels")); assertEquals(0.8, ((Number) q2.get("value")).doubleValue(), 0.001); - // _count final Map countDoc = parseJson(docs.get(2)); assertEquals("__name__ rpc_latency_count service api", countDoc.get("labels")); assertEquals(1000.0, ((Number) countDoc.get("value")).doubleValue(), 0.001); - // _sum final Map sumDoc = parseJson(docs.get(3)); assertEquals("__name__ rpc_latency_sum service api", sumDoc.get("labels")); assertEquals(300.5, ((Number) sumDoc.get("value")).doubleValue(), 0.001); } - // --- Label Sorting Tests --- - @Test void labels_are_sorted_lexicographically() throws Exception { final Map attributes = new HashMap<>(); @@ -275,15 +252,12 @@ void labels_are_sorted_lexicographically() throws Exception { .withEventKind("GAUGE") .build(); - final List docs = builder.build(gauge); + final List docs = builder.buildDocuments(gauge); final Map doc = parseJson(docs.get(0)); - // Sorted: __name__, app, host, zone assertEquals("__name__ cpu_temp app myservice host server-01 zone us-east", doc.get("labels")); } - // --- Label Sanitization Tests --- - @Test void label_values_with_spaces_are_sanitized() throws Exception { final Map attributes = Map.of("handler", "/api/items with spaces"); @@ -296,7 +270,7 @@ void label_values_with_spaces_are_sanitized() throws Exception { .withEventKind("GAUGE") .build(); - final List docs = builder.build(gauge); + final List docs = builder.buildDocuments(gauge); final Map doc = parseJson(docs.get(0)); assertTrue(((String) doc.get("labels")).contains("handler /api/items_with_spaces")); @@ -304,7 +278,6 @@ void label_values_with_spaces_are_sanitized() throws Exception { @Test void label_keys_with_invalid_chars_are_sanitized() throws Exception { - // key "123" should become "_123" (digit-first gets underscore prepended) final Map attributes = Map.of("123", "val"); final JacksonGauge gauge = JacksonGauge.builder() @@ -315,7 +288,7 @@ void label_keys_with_invalid_chars_are_sanitized() throws Exception { .withEventKind("GAUGE") .build(); - final List docs = builder.build(gauge); + final List docs = builder.buildDocuments(gauge); final Map doc = parseJson(docs.get(0)); assertTrue(((String) doc.get("labels")).contains("_123 val")); @@ -333,7 +306,7 @@ void label_keys_with_dashes_are_sanitized() throws Exception { .withEventKind("GAUGE") .build(); - final List docs = builder.build(gauge); + final List docs = builder.buildDocuments(gauge); final Map doc = parseJson(docs.get(0)); assertTrue(((String) doc.get("labels")).contains("key_with_dash val")); @@ -341,7 +314,6 @@ void label_keys_with_dashes_are_sanitized() throws Exception { @Test void extra_label_is_merge_inserted_at_correct_sorted_position() throws Exception { - // "le" sorts between __name__ and "method" (l < m) final Map attributes = new HashMap<>(); attributes.put("method", "GET"); attributes.put("region", "us-east"); @@ -357,7 +329,7 @@ void extra_label_is_merge_inserted_at_correct_sorted_position() throws Exception .withEventKind("HISTOGRAM") .build(); - final List docs = builder.build(histogram); + final List docs = builder.buildDocuments(histogram); final Map bucketDoc = parseJson(docs.get(0)); assertEquals("__name__ duration_bucket le 0.5 method GET region us-east", bucketDoc.get("labels")); @@ -365,7 +337,6 @@ void extra_label_is_merge_inserted_at_correct_sorted_position() throws Exception @Test void extra_label_sorting_after_all_attributes() throws Exception { - // "quantile" sorts after "app" (q > a) final Map attributes = Map.of("app", "web"); final List quantiles = Arrays.asList(new DefaultQuantile(0.99, 1.0)); @@ -380,14 +351,12 @@ void extra_label_sorting_after_all_attributes() throws Exception { .withEventKind("SUMMARY") .build(); - final List docs = builder.build(summary); + final List docs = builder.buildDocuments(summary); final Map quantileDoc = parseJson(docs.get(0)); assertEquals("__name__ latency app web quantile 0.99", quantileDoc.get("labels")); } - // --- Timestamp Tests --- - @Test void build_with_valid_timestamp() throws Exception { final JacksonGauge gauge = JacksonGauge.builder() @@ -397,14 +366,12 @@ void build_with_valid_timestamp() throws Exception { .withEventKind("GAUGE") .build(); - final List docs = builder.build(gauge); + final List docs = builder.buildDocuments(gauge); final Map doc = parseJson(docs.get(0)); assertEquals(TEST_TIMESTAMP_MILLIS, ((Number) doc.get("timestamp")).longValue()); } - // --- Non-Metric Event Test --- - @Test void build_throws_for_non_metric_event() { final Event event = JacksonEvent.builder() @@ -412,7 +379,7 @@ void build_throws_for_non_metric_event() { .withData(Map.of("message", "hello")) .build(); - assertThrows(IllegalArgumentException.class, () -> builder.build(event)); + assertThrows(IllegalArgumentException.class, () -> builder.buildDocuments(event)); } private Map parseJson(final String json) throws Exception { From fd174fc135b8f15d53a17e209d0d7a181ad6f19b Mon Sep 17 00:00:00 2001 From: Srikanth Padakanti Date: Wed, 22 Apr 2026 19:01:09 -0500 Subject: [PATCH 5/5] Skip NaN and infinite quantile values in TSDBDocumentBuilder Signed-off-by: Srikanth Padakanti --- .../opensearch/index/TSDBDocumentBuilder.java | 6 +- .../index/TSDBDocumentBuilderTest.java | 81 +++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java index 236d9bcb75..e994d50bdf 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilder.java @@ -111,8 +111,12 @@ private static List buildSummary(final Summary summary, final String[][] if (quantiles != null) { for (final Quantile q : quantiles) { + final double qValue = q.getValue() != null ? q.getValue() : 0.0; + if (!Double.isFinite(qValue)) { + continue; + } final String labels = buildLabels(baseName, sortedAttrs, "quantile", formatDouble(q.getQuantile())); - documents.add(buildJsonDoc(labels, timestamp, q.getValue() != null ? q.getValue() : 0.0)); + documents.add(buildJsonDoc(labels, timestamp, qValue)); } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java index 30a1fa6444..087ed46718 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/TSDBDocumentBuilderTest.java @@ -382,6 +382,87 @@ void build_throws_for_non_metric_event() { assertThrows(IllegalArgumentException.class, () -> builder.buildDocuments(event)); } + @Test + void build_summary_skips_quantiles_with_NaN_values() throws Exception { + final Map attributes = Map.of("service", "api"); + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, Double.NaN), + new DefaultQuantile(0.99, 0.8) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("rpc_latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(1000L) + .withSum(300.5) + .withTime(TEST_TIME) + .withAttributes(attributes) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.buildDocuments(summary); + + assertEquals(3, docs.size()); + + final Map q1 = parseJson(docs.get(0)); + assertEquals("__name__ rpc_latency quantile 0.99 service api", q1.get("labels")); + assertEquals(0.8, ((Number) q1.get("value")).doubleValue(), 0.001); + + final Map countDoc = parseJson(docs.get(1)); + assertEquals("__name__ rpc_latency_count service api", countDoc.get("labels")); + + final Map sumDoc = parseJson(docs.get(2)); + assertEquals("__name__ rpc_latency_sum service api", sumDoc.get("labels")); + } + + @Test + void build_summary_skips_all_quantiles_when_all_NaN() throws Exception { + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, Double.NaN), + new DefaultQuantile(0.99, Double.NaN) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("rpc_latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(0L) + .withSum(0.0) + .withTime(TEST_TIME) + .withAttributes(Map.of()) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.buildDocuments(summary); + + assertEquals(2, docs.size()); + assertTrue(parseJson(docs.get(0)).get("labels").toString().contains("_count")); + assertTrue(parseJson(docs.get(1)).get("labels").toString().contains("_sum")); + } + + @Test + void build_summary_skips_quantiles_with_infinite_values() throws Exception { + final List quantiles = Arrays.asList( + new DefaultQuantile(0.5, Double.POSITIVE_INFINITY), + new DefaultQuantile(0.99, 0.5) + ); + final JacksonSummary summary = JacksonSummary.builder() + .withName("latency") + .withQuantiles(quantiles) + .withQuantilesValueCount(2) + .withCount(100L) + .withSum(50.0) + .withTime(TEST_TIME) + .withAttributes(Map.of()) + .withEventKind("SUMMARY") + .build(); + + final List docs = builder.buildDocuments(summary); + + assertEquals(3, docs.size()); + final Map q1 = parseJson(docs.get(0)); + assertEquals(0.5, ((Number) q1.get("value")).doubleValue(), 0.001); + } + private Map parseJson(final String json) throws Exception { return OBJECT_MAPPER.readValue(json, new TypeReference>() {}); }