diff --git a/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusMetricUtils.java b/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusMetricUtils.java new file mode 100644 index 0000000000..eef3ba4ead --- /dev/null +++ b/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusMetricUtils.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.prometheus; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +final class PrometheusMetricUtils { + + private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricUtils.class); + + static final String AGGREGATION_TEMPORALITY_CUMULATIVE = "AGGREGATION_TEMPORALITY_CUMULATIVE"; + + private static final String TOTAL_SUFFIX = "_total"; + private static final String CREATED_SUFFIX = "_created"; + private static final String SERVICE_NAME_LABEL = "service.name"; + private static final String SERVICE_NAME_UNDERSCORE_LABEL = "service_name"; + private static final String JOB_LABEL = "job"; + + private PrometheusMetricUtils() { + } + + static String extractServiceName(final Map attributes) { + if (attributes.containsKey(SERVICE_NAME_LABEL)) { + return (String) attributes.get(SERVICE_NAME_LABEL); + } + if (attributes.containsKey(SERVICE_NAME_UNDERSCORE_LABEL)) { + return (String) attributes.get(SERVICE_NAME_UNDERSCORE_LABEL); + } + if (attributes.containsKey(JOB_LABEL)) { + return (String) attributes.get(JOB_LABEL); + } + return ""; + } + + static String stripCounterSuffix(final String metricName) { + if (metricName.endsWith(TOTAL_SUFFIX)) { + return metricName.substring(0, metricName.length() - TOTAL_SUFFIX.length()); + } + if (metricName.endsWith(CREATED_SUFFIX)) { + return metricName.substring(0, metricName.length() - CREATED_SUFFIX.length()); + } + return metricName; + } + + static Double parseLeValue(final String leValue) { + if (leValue == null) { + return null; + } + if ("+Inf".equals(leValue)) { + return Double.POSITIVE_INFINITY; + } + if ("-Inf".equals(leValue)) { + return Double.NEGATIVE_INFINITY; + } + try { + return Double.parseDouble(leValue); + } catch (final NumberFormatException e) { + LOG.warn("Skipping histogram bucket with unparseable le value: '{}'", leValue); + return null; + } + } + + static Double parseQuantileValue(final String quantileValue) { + if (quantileValue == null) { + return null; + } + try { + return Double.parseDouble(quantileValue); + } catch (final NumberFormatException e) { + LOG.warn("Skipping summary quantile with unparseable value: '{}'", quantileValue); + return null; + } + } +} diff --git a/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/RemoteWriteProtobufParser.java b/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/RemoteWriteProtobufParser.java index af20ccb5d7..70a6a2f6d1 100644 --- a/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/RemoteWriteProtobufParser.java +++ b/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/RemoteWriteProtobufParser.java @@ -75,9 +75,6 @@ public class RemoteWriteProtobufParser { private static final String SUM_SUFFIX = "_sum"; private static final String TOTAL_SUFFIX = "_total"; private static final String CREATED_SUFFIX = "_created"; - private static final String SERVICE_NAME_LABEL = "service.name"; - private static final String SERVICE_NAME_UNDERSCORE_LABEL = "service_name"; - private static final String JOB_LABEL = "job"; private final PrometheusRemoteWriteSourceConfig config; @@ -284,13 +281,13 @@ private List> convertHistogramGroup(final HistogramGroup group) { final List> records = new ArrayList<>(); final Map commonAttributes = new HashMap<>(group.buckets.get(0).labels.commonLabels); - final String serviceName = extractServiceName(commonAttributes); + final String serviceName = PrometheusMetricUtils.extractServiceName(commonAttributes); final Instant timeReceived = Instant.now(); for (final long ts : timestampOrder.keySet()) { final TreeMap cumulativeBuckets = new TreeMap<>(); for (final BucketEntry bucket : group.buckets) { - final Double leBound = parseLeValue((String) bucket.labels.attributes.get(LE_LABEL)); + final Double leBound = PrometheusMetricUtils.parseLeValue((String) bucket.labels.attributes.get(LE_LABEL)); if (leBound == null) { continue; } @@ -343,7 +340,7 @@ private List> convertHistogramGroup(final HistogramGroup group) { .withExplicitBoundsList(explicitBounds) .withBucketCount(perBucketCounts.size()) .withExplicitBoundsCount(explicitBounds.size()) - .withAggregationTemporality("AGGREGATION_TEMPORALITY_CUMULATIVE") + .withAggregationTemporality(PrometheusMetricUtils.AGGREGATION_TEMPORALITY_CUMULATIVE) .withAttributes(new HashMap<>(commonAttributes)) .withServiceName(serviceName) .withTimeReceived(timeReceived) @@ -373,14 +370,14 @@ private List> convertSummaryGroup(final SummaryGroup group) { final List> records = new ArrayList<>(); final Map commonAttributes = new HashMap<>(group.quantiles.get(0).labels.commonLabels); - final String serviceName = extractServiceName(commonAttributes); + final String serviceName = PrometheusMetricUtils.extractServiceName(commonAttributes); final Instant timeReceived = Instant.now(); for (final long ts : timestampOrder.keySet()) { final List quantiles = new ArrayList<>(); for (final QuantileEntry qe : group.quantiles) { - final Double quantileValue = parseQuantileValue( + final Double quantileValue = PrometheusMetricUtils.parseQuantileValue( (String) qe.labels.attributes.get(QUANTILE_LABEL)); if (quantileValue == null) { continue; @@ -428,21 +425,21 @@ private List> convertSummaryGroup(final SummaryGroup group) { private List> convertStandalone(final StandaloneTimeSeries standalone) { final List> records = new ArrayList<>(); - final String serviceName = extractServiceName(standalone.labels.attributes); + final String serviceName = PrometheusMetricUtils.extractServiceName(standalone.labels.attributes); final Instant timeReceived = Instant.now(); for (final Types.Sample sample : standalone.timeSeries.getSamplesList()) { final String timestamp = resolveTimestamp(sample.getTimestamp()); if (standalone.isCounter) { - final String counterName = stripCounterSuffix(standalone.labels.metricName); + final String counterName = PrometheusMetricUtils.stripCounterSuffix(standalone.labels.metricName); records.add(new Record<>(JacksonSum.builder() .withName(counterName) .withTime(timestamp) .withValue(sample.getValue()) .withAttributes(new HashMap<>(standalone.labels.attributes)) .withIsMonotonic(true) - .withAggregationTemporality("AGGREGATION_TEMPORALITY_CUMULATIVE") + .withAggregationTemporality(PrometheusMetricUtils.AGGREGATION_TEMPORALITY_CUMULATIVE) .withServiceName(serviceName) .withTimeReceived(timeReceived) .build(config.isFlattenLabels()))); @@ -461,49 +458,11 @@ private List> convertStandalone(final StandaloneTimeSeries standal return records; } - /** - * Extracts the service name from attributes using priority order: - * service.name > service_name > job > empty string. - */ - static String extractServiceName(final Map attributes) { - if (attributes.containsKey(SERVICE_NAME_LABEL)) { - return (String) attributes.get(SERVICE_NAME_LABEL); - } - if (attributes.containsKey(SERVICE_NAME_UNDERSCORE_LABEL)) { - return (String) attributes.get(SERVICE_NAME_UNDERSCORE_LABEL); - } - if (attributes.containsKey(JOB_LABEL)) { - return (String) attributes.get(JOB_LABEL); - } - return ""; - } - - /** - * Strips the {@code _total} or {@code _created} suffix from counter metric names. - */ - static String stripCounterSuffix(final String metricName) { - if (metricName.endsWith(TOTAL_SUFFIX)) { - return metricName.substring(0, metricName.length() - TOTAL_SUFFIX.length()); - } - if (metricName.endsWith(CREATED_SUFFIX)) { - return metricName.substring(0, metricName.length() - CREATED_SUFFIX.length()); - } - return metricName; - } - /** - * Infers whether a metric is a counter (Sum) based on its name suffix. - * - * @param metricName the metric name - * @return true if the metric is a counter - */ static boolean isCounter(final String metricName) { return metricName.endsWith(TOTAL_SUFFIX) || metricName.endsWith(CREATED_SUFFIX); } - /** - * Resolves a timestamp, using current time if the value is 0. - */ private static String resolveTimestamp(final long timestampMs) { if (timestampMs == 0) { return Instant.now().toString(); @@ -511,39 +470,6 @@ private static String resolveTimestamp(final long timestampMs) { return Instant.ofEpochMilli(timestampMs).toString(); } - /** - * Parses an {@code le} label value to a Double. Returns null if unparseable. - */ - static Double parseLeValue(final String leValue) { - if (leValue == null) { - return null; - } - if ("+Inf".equals(leValue)) { - return Double.POSITIVE_INFINITY; - } - try { - return Double.parseDouble(leValue); - } catch (final NumberFormatException e) { - LOG.warn("Skipping histogram bucket with unparseable le value: '{}'", leValue); - return null; - } - } - - /** - * Parses a {@code quantile} label value to a Double. Returns null if unparseable. - */ - static Double parseQuantileValue(final String quantileValue) { - if (quantileValue == null) { - return null; - } - try { - return Double.parseDouble(quantileValue); - } catch (final NumberFormatException e) { - LOG.warn("Skipping summary quantile with unparseable value: '{}'", quantileValue); - return null; - } - } - /** * Gets the sample value as long at a given timestamp from a TimeSeries. * Returns 0 if no sample exists at the requested timestamp. diff --git a/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/TextExpositionParser.java b/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/TextExpositionParser.java index 082cba6ccb..6003cdf6e3 100644 --- a/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/TextExpositionParser.java +++ b/data-prepper-plugins/prometheus-source/src/main/java/org/opensearch/dataprepper/plugins/source/prometheus/TextExpositionParser.java @@ -40,9 +40,6 @@ public class TextExpositionParser { private static final String SUM_SUFFIX = "_sum"; private static final String LE_LABEL = "le"; private static final String QUANTILE_LABEL = "quantile"; - private static final String SERVICE_NAME_LABEL = "service.name"; - private static final String SERVICE_NAME_UNDERSCORE_LABEL = "service_name"; - private static final String JOB_LABEL = "job"; private static final String TYPE_COUNTER = "counter"; private static final String TYPE_GAUGE = "gauge"; @@ -50,7 +47,6 @@ public class TextExpositionParser { private static final String TYPE_SUMMARY = "summary"; private static final String TYPE_UNTYPED = "untyped"; - private static final String AGGREGATION_TEMPORALITY_CUMULATIVE = "AGGREGATION_TEMPORALITY_CUMULATIVE"; private static final String[] TYPE_LOOKUP_SUFFIXES = {BUCKET_SUFFIX, COUNT_SUFFIX, SUM_SUFFIX, TOTAL_SUFFIX, CREATED_SUFFIX}; @@ -291,7 +287,7 @@ String resolveType(final String sampleName, final Map declaredTy private Record buildGaugeRecord(final ParsedSample sample, final Instant timeReceived) { final Map attributes = new HashMap<>(sample.labels); - final String serviceName = extractServiceName(attributes); + final String serviceName = PrometheusMetricUtils.extractServiceName(attributes); final String timestamp = resolveTimestamp(sample.timestampMs, timeReceived); final Event event = JacksonGauge.builder() @@ -308,16 +304,16 @@ private Record buildGaugeRecord(final ParsedSample sample, final Instant private Record buildSumRecord(final ParsedSample sample, final Instant timeReceived) { final Map attributes = new HashMap<>(sample.labels); - final String serviceName = extractServiceName(attributes); + final String serviceName = PrometheusMetricUtils.extractServiceName(attributes); final String timestamp = resolveTimestamp(sample.timestampMs, timeReceived); final Event event = JacksonSum.builder() - .withName(stripCounterSuffix(sample.name)) + .withName(PrometheusMetricUtils.stripCounterSuffix(sample.name)) .withTime(timestamp) .withValue(sample.value) .withAttributes(attributes) .withIsMonotonic(true) - .withAggregationTemporality(AGGREGATION_TEMPORALITY_CUMULATIVE) + .withAggregationTemporality(PrometheusMetricUtils.AGGREGATION_TEMPORALITY_CUMULATIVE) .withServiceName(serviceName) .withTimeReceived(timeReceived) .build(flattenLabels); @@ -337,7 +333,7 @@ private void accumulateHistogram(final ParsedSample sample, if (sample.name.endsWith(BUCKET_SUFFIX)) { final String leStr = sample.labels.get(LE_LABEL); - final Double leBound = parseLeValue(leStr); + final Double leBound = PrometheusMetricUtils.parseLeValue(leStr); if (leBound != null && !Double.isNaN(sample.value)) { acc.cumulativeBuckets.put(leBound, (long) sample.value); } @@ -361,7 +357,7 @@ private void accumulateSummary(final ParsedSample sample, k -> new SummaryAccumulator(baseName, commonLabels, sample.timestampMs)); if (sample.labels.containsKey(QUANTILE_LABEL)) { - final Double quantile = parseQuantileValue(sample.labels.get(QUANTILE_LABEL)); + final Double quantile = PrometheusMetricUtils.parseQuantileValue(sample.labels.get(QUANTILE_LABEL)); if (quantile != null) { acc.quantiles.add(new DefaultQuantile(quantile, sample.value)); } @@ -398,7 +394,7 @@ private Record buildHistogramRecord(final HistogramAccumulator acc, final } final Map attributes = new HashMap<>(acc.commonLabels); - final String serviceName = extractServiceName(attributes); + final String serviceName = PrometheusMetricUtils.extractServiceName(attributes); final String timestamp = resolveTimestamp(acc.timestampMs, timeReceived); final Event event = JacksonHistogram.builder() @@ -410,7 +406,7 @@ private Record buildHistogramRecord(final HistogramAccumulator acc, final .withExplicitBoundsList(explicitBounds) .withBucketCount(perBucketCounts.size()) .withExplicitBoundsCount(explicitBounds.size()) - .withAggregationTemporality(AGGREGATION_TEMPORALITY_CUMULATIVE) + .withAggregationTemporality(PrometheusMetricUtils.AGGREGATION_TEMPORALITY_CUMULATIVE) .withAttributes(attributes) .withServiceName(serviceName) .withTimeReceived(timeReceived) @@ -425,7 +421,7 @@ private Record buildSummaryRecord(final SummaryAccumulator acc, final Ins } final Map attributes = new HashMap<>(acc.commonLabels); - final String serviceName = extractServiceName(attributes); + final String serviceName = PrometheusMetricUtils.extractServiceName(attributes); final String timestamp = resolveTimestamp(acc.timestampMs, timeReceived); final Event event = JacksonSummary.builder() @@ -443,29 +439,6 @@ private Record buildSummaryRecord(final SummaryAccumulator acc, final Ins return new Record<>(event); } - static String extractServiceName(final Map attributes) { - if (attributes.containsKey(SERVICE_NAME_LABEL)) { - return (String) attributes.get(SERVICE_NAME_LABEL); - } - if (attributes.containsKey(SERVICE_NAME_UNDERSCORE_LABEL)) { - return (String) attributes.get(SERVICE_NAME_UNDERSCORE_LABEL); - } - if (attributes.containsKey(JOB_LABEL)) { - return (String) attributes.get(JOB_LABEL); - } - return ""; - } - - static String stripCounterSuffix(final String metricName) { - if (metricName.endsWith(TOTAL_SUFFIX)) { - return metricName.substring(0, metricName.length() - TOTAL_SUFFIX.length()); - } - if (metricName.endsWith(CREATED_SUFFIX)) { - return metricName.substring(0, metricName.length() - CREATED_SUFFIX.length()); - } - return metricName; - } - static String resolveTimestamp(final Long timestampMs, final Instant timeReceived) { if (timestampMs == null) { return timeReceived.toString(); @@ -473,36 +446,6 @@ static String resolveTimestamp(final Long timestampMs, final Instant timeReceive return Instant.ofEpochMilli(timestampMs).toString(); } - static Double parseLeValue(final String leValue) { - if (leValue == null) { - return null; - } - if ("+Inf".equals(leValue)) { - return Double.POSITIVE_INFINITY; - } - if ("-Inf".equals(leValue)) { - return Double.NEGATIVE_INFINITY; - } - try { - return Double.parseDouble(leValue); - } catch (final NumberFormatException e) { - LOG.warn("Skipping histogram bucket with unparseable le value: '{}'", leValue); - return null; - } - } - - static Double parseQuantileValue(final String quantileValue) { - if (quantileValue == null) { - return null; - } - try { - return Double.parseDouble(quantileValue); - } catch (final NumberFormatException e) { - LOG.warn("Skipping summary quantile with unparseable value: '{}'", quantileValue); - return null; - } - } - static double parseValue(final String valueStr) { if ("NaN".equalsIgnoreCase(valueStr)) { return Double.NaN; diff --git a/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusMetricUtilsTest.java b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusMetricUtilsTest.java new file mode 100644 index 0000000000..2678eca370 --- /dev/null +++ b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusMetricUtilsTest.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.prometheus; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class PrometheusMetricUtilsTest { + + @Test + void extractServiceName_returns_service_name_label() { + final Map attributes = new HashMap<>(); + attributes.put("service.name", "my-service"); + attributes.put("job", "fallback"); + assertThat(PrometheusMetricUtils.extractServiceName(attributes), equalTo("my-service")); + } + + @Test + void extractServiceName_returns_service_name_underscore_label() { + final Map attributes = new HashMap<>(); + attributes.put("service_name", "my-service"); + attributes.put("job", "fallback"); + assertThat(PrometheusMetricUtils.extractServiceName(attributes), equalTo("my-service")); + } + + @Test + void extractServiceName_returns_job_label_as_fallback() { + final Map attributes = new HashMap<>(); + attributes.put("job", "my-job"); + assertThat(PrometheusMetricUtils.extractServiceName(attributes), equalTo("my-job")); + } + + @Test + void extractServiceName_returns_empty_when_no_match() { + final Map attributes = new HashMap<>(); + attributes.put("instance", "localhost:9090"); + assertThat(PrometheusMetricUtils.extractServiceName(attributes), equalTo("")); + } + + @ParameterizedTest + @CsvSource({ + "http_requests_total, http_requests", + "process_cpu_seconds_total, process_cpu_seconds", + "some_metric_created, some_metric", + "gauge_metric, gauge_metric" + }) + void stripCounterSuffix_removes_expected_suffix(final String input, final String expected) { + assertThat(PrometheusMetricUtils.stripCounterSuffix(input), equalTo(expected)); + } + + @Test + void parseLeValue_returns_null_for_null_input() { + assertThat(PrometheusMetricUtils.parseLeValue(null), nullValue()); + } + + @Test + void parseLeValue_returns_positive_infinity() { + assertThat(PrometheusMetricUtils.parseLeValue("+Inf"), equalTo(Double.POSITIVE_INFINITY)); + } + + @Test + void parseLeValue_returns_negative_infinity() { + assertThat(PrometheusMetricUtils.parseLeValue("-Inf"), equalTo(Double.NEGATIVE_INFINITY)); + } + + @ParameterizedTest + @ValueSource(strings = {"0.5", "1.0", "10.0", "100"}) + void parseLeValue_returns_parsed_double(final String leValue) { + assertThat(PrometheusMetricUtils.parseLeValue(leValue), equalTo(Double.parseDouble(leValue))); + } + + @Test + void parseLeValue_returns_null_for_unparseable() { + assertThat(PrometheusMetricUtils.parseLeValue("notanumber"), nullValue()); + } + + @Test + void parseQuantileValue_returns_null_for_null_input() { + assertThat(PrometheusMetricUtils.parseQuantileValue(null), nullValue()); + } + + @ParameterizedTest + @ValueSource(strings = {"0.5", "0.9", "0.99", "1.0"}) + void parseQuantileValue_returns_parsed_double(final String quantileValue) { + assertThat(PrometheusMetricUtils.parseQuantileValue(quantileValue), equalTo(Double.parseDouble(quantileValue))); + } + + @Test + void parseQuantileValue_returns_null_for_unparseable() { + assertThat(PrometheusMetricUtils.parseQuantileValue("bad"), nullValue()); + } + + @Test + void aggregation_temporality_constant_has_expected_value() { + assertThat(PrometheusMetricUtils.AGGREGATION_TEMPORALITY_CUMULATIVE, + equalTo("AGGREGATION_TEMPORALITY_CUMULATIVE")); + } +} diff --git a/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusOutputConsistencyTest.java b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusOutputConsistencyTest.java new file mode 100644 index 0000000000..a2f926f972 --- /dev/null +++ b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusOutputConsistencyTest.java @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.prometheus; + +import com.arpnetworking.metrics.prometheus.Remote; +import com.arpnetworking.metrics.prometheus.Types; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.metric.Metric; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class PrometheusOutputConsistencyTest { + + private static final long TIMESTAMP_MS = 1395066363000L; + + @Mock + private PrometheusRemoteWriteSourceConfig config; + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void gauge_output_is_consistent_across_parsers(final boolean flattenLabels) throws PrometheusParseException { + final String textBody = "# TYPE temperature_celsius gauge\n" + + "temperature_celsius{location=\"outside\"} 21.3 " + TIMESTAMP_MS + "\n"; + + final Remote.WriteRequest writeRequest = Remote.WriteRequest.newBuilder() + .addTimeseries(Types.TimeSeries.newBuilder() + .addLabels(Types.Label.newBuilder().setName("__name__").setValue("temperature_celsius").build()) + .addLabels(Types.Label.newBuilder().setName("location").setValue("outside").build()) + .addSamples(Types.Sample.newBuilder().setValue(21.3).setTimestamp(TIMESTAMP_MS).build()) + .build()) + .build(); + + final List> textRecords = new TextExpositionParser(flattenLabels).parse(textBody); + final List> protoRecords = parseProtobuf(writeRequest, flattenLabels); + + assertThat(textRecords, hasSize(1)); + assertThat(protoRecords, hasSize(1)); + + final Metric textMetric = (Metric) textRecords.get(0).getData(); + final Metric protoMetric = (Metric) protoRecords.get(0).getData(); + + assertThat(protoMetric.getKind(), equalTo(textMetric.getKind())); + assertThat(protoMetric.getName(), equalTo("temperature_celsius")); + assertThat(textMetric.getName(), equalTo("temperature_celsius")); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void counter_output_is_consistent_across_parsers(final boolean flattenLabels) throws PrometheusParseException { + final String textBody = "# TYPE http_requests_total counter\n" + + "http_requests_total{method=\"GET\",code=\"200\"} 1027 " + TIMESTAMP_MS + "\n"; + + final Remote.WriteRequest writeRequest = Remote.WriteRequest.newBuilder() + .addTimeseries(Types.TimeSeries.newBuilder() + .addLabels(Types.Label.newBuilder().setName("__name__").setValue("http_requests_total").build()) + .addLabels(Types.Label.newBuilder().setName("method").setValue("GET").build()) + .addLabels(Types.Label.newBuilder().setName("code").setValue("200").build()) + .addSamples(Types.Sample.newBuilder().setValue(1027.0).setTimestamp(TIMESTAMP_MS).build()) + .build()) + .build(); + + final List> textRecords = new TextExpositionParser(flattenLabels).parse(textBody); + final List> protoRecords = parseProtobuf(writeRequest, flattenLabels); + + assertThat(textRecords, hasSize(1)); + assertThat(protoRecords, hasSize(1)); + + final Metric textMetric = (Metric) textRecords.get(0).getData(); + final Metric protoMetric = (Metric) protoRecords.get(0).getData(); + + assertThat(protoMetric.getKind(), equalTo(textMetric.getKind())); + assertThat(protoMetric.getName(), equalTo("http_requests")); + assertThat(textMetric.getName(), equalTo("http_requests")); + } + + private List> parseProtobuf(final Remote.WriteRequest writeRequest, final boolean flattenLabels) + throws PrometheusParseException { + when(config.isFlattenLabels()).thenReturn(flattenLabels); + final RemoteWriteProtobufParser parser = new RemoteWriteProtobufParser(config); + return parser.parseDecompressed(writeRequest.toByteArray()); + } +} diff --git a/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusScrapeConfigTest.java b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusScrapeConfigTest.java index b91df6ee10..4cbcc3891d 100644 --- a/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusScrapeConfigTest.java +++ b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/PrometheusScrapeConfigTest.java @@ -124,4 +124,38 @@ void testDeserializationWithMultipleTargets() throws Exception { assertThat(config.getTargets().get(0).getUrl(), equalTo("http://host1:9090/metrics")); assertThat(config.getTargets().get(1).getUrl(), equalTo("http://host2:9090/metrics")); } + + @Test + void isTargetUrlSchemeValid_returns_true_when_insecure() throws Exception { + final String json = "{\"targets\": [{\"url\": \"http://host:9090/metrics\"}], \"insecure\": true}"; + final PrometheusScrapeConfig config = OBJECT_MAPPER.readValue(json, PrometheusScrapeConfig.class); + assertThat(config.isTargetUrlSchemeValid(), is(true)); + } + + @Test + void isTargetUrlSchemeValid_returns_true_when_targets_null() { + final PrometheusScrapeConfig config = new PrometheusScrapeConfig(); + assertThat(config.isTargetUrlSchemeValid(), is(true)); + } + + @Test + void isTargetUrlSchemeValid_returns_false_when_http_and_not_insecure() throws Exception { + final String json = "{\"targets\": [{\"url\": \"http://host:9090/metrics\"}], \"insecure\": false}"; + final PrometheusScrapeConfig config = OBJECT_MAPPER.readValue(json, PrometheusScrapeConfig.class); + assertThat(config.isTargetUrlSchemeValid(), is(false)); + } + + @Test + void isTargetUrlSchemeValid_returns_true_when_https_and_not_insecure() throws Exception { + final String json = "{\"targets\": [{\"url\": \"https://host:9090/metrics\"}], \"insecure\": false}"; + final PrometheusScrapeConfig config = OBJECT_MAPPER.readValue(json, PrometheusScrapeConfig.class); + assertThat(config.isTargetUrlSchemeValid(), is(true)); + } + + @Test + void isTargetUrlSchemeValid_returns_true_when_target_url_null() throws Exception { + final String json = "{\"targets\": [{}], \"insecure\": false}"; + final PrometheusScrapeConfig config = OBJECT_MAPPER.readValue(json, PrometheusScrapeConfig.class); + assertThat(config.isTargetUrlSchemeValid(), is(true)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/RemoteWriteProtobufParserTest.java b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/RemoteWriteProtobufParserTest.java index 525770b06d..df9c423580 100644 --- a/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/RemoteWriteProtobufParserTest.java +++ b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/RemoteWriteProtobufParserTest.java @@ -522,23 +522,23 @@ void testExtractServiceNamePriorityOrder() { attrs.put("service.name", "svc1"); attrs.put("service_name", "svc2"); attrs.put("job", "svc3"); - assertThat(RemoteWriteProtobufParser.extractServiceName(attrs), equalTo("svc1")); + assertThat(PrometheusMetricUtils.extractServiceName(attrs), equalTo("svc1")); attrs.remove("service.name"); - assertThat(RemoteWriteProtobufParser.extractServiceName(attrs), equalTo("svc2")); + assertThat(PrometheusMetricUtils.extractServiceName(attrs), equalTo("svc2")); attrs.remove("service_name"); - assertThat(RemoteWriteProtobufParser.extractServiceName(attrs), equalTo("svc3")); + assertThat(PrometheusMetricUtils.extractServiceName(attrs), equalTo("svc3")); attrs.remove("job"); - assertThat(RemoteWriteProtobufParser.extractServiceName(attrs), equalTo("")); + assertThat(PrometheusMetricUtils.extractServiceName(attrs), equalTo("")); } @Test void testStripCounterSuffix() { - assertThat(RemoteWriteProtobufParser.stripCounterSuffix("http_requests_total"), equalTo("http_requests")); - assertThat(RemoteWriteProtobufParser.stripCounterSuffix("cpu_temperature"), equalTo("cpu_temperature")); - assertThat(RemoteWriteProtobufParser.stripCounterSuffix("http_requests_created"), equalTo("http_requests")); + assertThat(PrometheusMetricUtils.stripCounterSuffix("http_requests_total"), equalTo("http_requests")); + assertThat(PrometheusMetricUtils.stripCounterSuffix("cpu_temperature"), equalTo("cpu_temperature")); + assertThat(PrometheusMetricUtils.stripCounterSuffix("http_requests_created"), equalTo("http_requests")); } @Test @@ -758,17 +758,17 @@ void testHistogramGroupedByLabelSet() throws Exception { @Test void testParseLeValueValidAndInvalid() { - assertThat(RemoteWriteProtobufParser.parseLeValue("0.5"), closeTo(0.5, 0.001)); - assertThat(RemoteWriteProtobufParser.parseLeValue("+Inf"), equalTo(Double.POSITIVE_INFINITY)); - assertThat(RemoteWriteProtobufParser.parseLeValue(null), nullValue()); - assertThat(RemoteWriteProtobufParser.parseLeValue("not_a_number"), nullValue()); + assertThat(PrometheusMetricUtils.parseLeValue("0.5"), closeTo(0.5, 0.001)); + assertThat(PrometheusMetricUtils.parseLeValue("+Inf"), equalTo(Double.POSITIVE_INFINITY)); + assertThat(PrometheusMetricUtils.parseLeValue(null), nullValue()); + assertThat(PrometheusMetricUtils.parseLeValue("not_a_number"), nullValue()); } @Test void testParseQuantileValueValidAndInvalid() { - assertThat(RemoteWriteProtobufParser.parseQuantileValue("0.99"), closeTo(0.99, 0.001)); - assertThat(RemoteWriteProtobufParser.parseQuantileValue(null), nullValue()); - assertThat(RemoteWriteProtobufParser.parseQuantileValue("not_a_number"), nullValue()); + assertThat(PrometheusMetricUtils.parseQuantileValue("0.99"), closeTo(0.99, 0.001)); + assertThat(PrometheusMetricUtils.parseQuantileValue(null), nullValue()); + assertThat(PrometheusMetricUtils.parseQuantileValue("not_a_number"), nullValue()); } @Test diff --git a/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/TextExpositionParserTest.java b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/TextExpositionParserTest.java index bf2f5406b3..dcfa3a92d4 100644 --- a/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/TextExpositionParserTest.java +++ b/data-prepper-plugins/prometheus-source/src/test/java/org/opensearch/dataprepper/plugins/source/prometheus/TextExpositionParserTest.java @@ -521,26 +521,26 @@ void testExtractServiceNamePriority() { attrs.put("service.name", "svc-dot"); attrs.put("service_name", "svc-underscore"); attrs.put("job", "svc-job"); - assertThat(TextExpositionParser.extractServiceName(attrs), equalTo("svc-dot")); + assertThat(PrometheusMetricUtils.extractServiceName(attrs), equalTo("svc-dot")); final Map attrsNoServiceName = new HashMap<>(); attrsNoServiceName.put("service_name", "svc-underscore"); attrsNoServiceName.put("job", "svc-job"); - assertThat(TextExpositionParser.extractServiceName(attrsNoServiceName), equalTo("svc-underscore")); + assertThat(PrometheusMetricUtils.extractServiceName(attrsNoServiceName), equalTo("svc-underscore")); final Map attrsJobOnly = new HashMap<>(); attrsJobOnly.put("job", "svc-job"); - assertThat(TextExpositionParser.extractServiceName(attrsJobOnly), equalTo("svc-job")); + assertThat(PrometheusMetricUtils.extractServiceName(attrsJobOnly), equalTo("svc-job")); final Map attrsEmpty = new HashMap<>(); - assertThat(TextExpositionParser.extractServiceName(attrsEmpty), equalTo("")); + assertThat(PrometheusMetricUtils.extractServiceName(attrsEmpty), equalTo("")); } @Test void testStripCounterSuffix() { - assertThat(TextExpositionParser.stripCounterSuffix("http_requests_total"), equalTo("http_requests")); - assertThat(TextExpositionParser.stripCounterSuffix("http_requests_created"), equalTo("http_requests")); - assertThat(TextExpositionParser.stripCounterSuffix("http_requests"), equalTo("http_requests")); + assertThat(PrometheusMetricUtils.stripCounterSuffix("http_requests_total"), equalTo("http_requests")); + assertThat(PrometheusMetricUtils.stripCounterSuffix("http_requests_created"), equalTo("http_requests")); + assertThat(PrometheusMetricUtils.stripCounterSuffix("http_requests"), equalTo("http_requests")); } @Test @@ -567,18 +567,18 @@ void testParseValueSpecialValues() { @Test void testParseLeValue() { - assertThat(TextExpositionParser.parseLeValue(null), equalTo(null)); - assertThat(TextExpositionParser.parseLeValue("+Inf"), equalTo(Double.POSITIVE_INFINITY)); - assertThat(TextExpositionParser.parseLeValue("-Inf"), equalTo(Double.NEGATIVE_INFINITY)); - assertThat(TextExpositionParser.parseLeValue("0.5"), closeTo(0.5, 0.001)); - assertThat(TextExpositionParser.parseLeValue("not_a_number"), equalTo(null)); + assertThat(PrometheusMetricUtils.parseLeValue(null), equalTo(null)); + assertThat(PrometheusMetricUtils.parseLeValue("+Inf"), equalTo(Double.POSITIVE_INFINITY)); + assertThat(PrometheusMetricUtils.parseLeValue("-Inf"), equalTo(Double.NEGATIVE_INFINITY)); + assertThat(PrometheusMetricUtils.parseLeValue("0.5"), closeTo(0.5, 0.001)); + assertThat(PrometheusMetricUtils.parseLeValue("not_a_number"), equalTo(null)); } @Test void testParseQuantileValue() { - assertThat(TextExpositionParser.parseQuantileValue(null), equalTo(null)); - assertThat(TextExpositionParser.parseQuantileValue("0.99"), closeTo(0.99, 0.001)); - assertThat(TextExpositionParser.parseQuantileValue("bad_value"), equalTo(null)); + assertThat(PrometheusMetricUtils.parseQuantileValue(null), equalTo(null)); + assertThat(PrometheusMetricUtils.parseQuantileValue("0.99"), closeTo(0.99, 0.001)); + assertThat(PrometheusMetricUtils.parseQuantileValue("bad_value"), equalTo(null)); } @Test @@ -822,4 +822,12 @@ void testServiceNameEmptyWhenNoJobLabel() { final Metric metric = (Metric) results.get(0).getData(); assertThat(metric.getServiceName(), equalTo("")); } + + @Test + void testParseSampleLineWithDecimalTimestamp() { + final TextExpositionParser.ParsedSample sample = parser.parseSampleLine("metric_name 42.0 1625000000.123"); + assertThat(sample, notNullValue()); + assertThat(sample.value, closeTo(42.0, 0.001)); + assertThat(sample.timestampMs, equalTo(1625000000123L)); + } } \ No newline at end of file