Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.prometheus;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

final class PrometheusMetricUtils {

private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricUtils.class);

static final String AGGREGATION_TEMPORALITY_CUMULATIVE = "AGGREGATION_TEMPORALITY_CUMULATIVE";

private static final String TOTAL_SUFFIX = "_total";
private static final String CREATED_SUFFIX = "_created";
private static final String SERVICE_NAME_LABEL = "service.name";
private static final String SERVICE_NAME_UNDERSCORE_LABEL = "service_name";
private static final String JOB_LABEL = "job";

private PrometheusMetricUtils() {
}

static String extractServiceName(final Map<String, Object> attributes) {
if (attributes.containsKey(SERVICE_NAME_LABEL)) {
return (String) attributes.get(SERVICE_NAME_LABEL);
}
if (attributes.containsKey(SERVICE_NAME_UNDERSCORE_LABEL)) {
return (String) attributes.get(SERVICE_NAME_UNDERSCORE_LABEL);
}
if (attributes.containsKey(JOB_LABEL)) {
return (String) attributes.get(JOB_LABEL);
}
return "";
}

static String stripCounterSuffix(final String metricName) {
if (metricName.endsWith(TOTAL_SUFFIX)) {
return metricName.substring(0, metricName.length() - TOTAL_SUFFIX.length());
}
if (metricName.endsWith(CREATED_SUFFIX)) {
return metricName.substring(0, metricName.length() - CREATED_SUFFIX.length());
}
return metricName;
}

static Double parseLeValue(final String leValue) {
if (leValue == null) {
return null;
}
if ("+Inf".equals(leValue)) {
return Double.POSITIVE_INFINITY;
}
if ("-Inf".equals(leValue)) {
return Double.NEGATIVE_INFINITY;
}
try {
return Double.parseDouble(leValue);
} catch (final NumberFormatException e) {
LOG.warn("Skipping histogram bucket with unparseable le value: '{}'", leValue);
return null;
}
}

static Double parseQuantileValue(final String quantileValue) {
if (quantileValue == null) {
return null;
}
try {
return Double.parseDouble(quantileValue);
} catch (final NumberFormatException e) {
LOG.warn("Skipping summary quantile with unparseable value: '{}'", quantileValue);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ public class RemoteWriteProtobufParser {
private static final String SUM_SUFFIX = "_sum";
private static final String TOTAL_SUFFIX = "_total";
private static final String CREATED_SUFFIX = "_created";
private static final String SERVICE_NAME_LABEL = "service.name";
private static final String SERVICE_NAME_UNDERSCORE_LABEL = "service_name";
private static final String JOB_LABEL = "job";

private final PrometheusRemoteWriteSourceConfig config;

Expand Down Expand Up @@ -284,13 +281,13 @@ private List<Record<Event>> convertHistogramGroup(final HistogramGroup group) {

final List<Record<Event>> records = new ArrayList<>();
final Map<String, Object> commonAttributes = new HashMap<>(group.buckets.get(0).labels.commonLabels);
final String serviceName = extractServiceName(commonAttributes);
final String serviceName = PrometheusMetricUtils.extractServiceName(commonAttributes);
final Instant timeReceived = Instant.now();

for (final long ts : timestampOrder.keySet()) {
final TreeMap<Double, Long> cumulativeBuckets = new TreeMap<>();
for (final BucketEntry bucket : group.buckets) {
final Double leBound = parseLeValue((String) bucket.labels.attributes.get(LE_LABEL));
final Double leBound = PrometheusMetricUtils.parseLeValue((String) bucket.labels.attributes.get(LE_LABEL));
if (leBound == null) {
continue;
}
Expand Down Expand Up @@ -343,7 +340,7 @@ private List<Record<Event>> convertHistogramGroup(final HistogramGroup group) {
.withExplicitBoundsList(explicitBounds)
.withBucketCount(perBucketCounts.size())
.withExplicitBoundsCount(explicitBounds.size())
.withAggregationTemporality("AGGREGATION_TEMPORALITY_CUMULATIVE")
.withAggregationTemporality(PrometheusMetricUtils.AGGREGATION_TEMPORALITY_CUMULATIVE)
.withAttributes(new HashMap<>(commonAttributes))
.withServiceName(serviceName)
.withTimeReceived(timeReceived)
Expand Down Expand Up @@ -373,14 +370,14 @@ private List<Record<Event>> convertSummaryGroup(final SummaryGroup group) {

final List<Record<Event>> records = new ArrayList<>();
final Map<String, Object> commonAttributes = new HashMap<>(group.quantiles.get(0).labels.commonLabels);
final String serviceName = extractServiceName(commonAttributes);
final String serviceName = PrometheusMetricUtils.extractServiceName(commonAttributes);
final Instant timeReceived = Instant.now();

for (final long ts : timestampOrder.keySet()) {
final List<Quantile> quantiles = new ArrayList<>();

for (final QuantileEntry qe : group.quantiles) {
final Double quantileValue = parseQuantileValue(
final Double quantileValue = PrometheusMetricUtils.parseQuantileValue(
(String) qe.labels.attributes.get(QUANTILE_LABEL));
if (quantileValue == null) {
continue;
Expand Down Expand Up @@ -428,21 +425,21 @@ private List<Record<Event>> convertSummaryGroup(final SummaryGroup group) {

private List<Record<Event>> convertStandalone(final StandaloneTimeSeries standalone) {
final List<Record<Event>> records = new ArrayList<>();
final String serviceName = extractServiceName(standalone.labels.attributes);
final String serviceName = PrometheusMetricUtils.extractServiceName(standalone.labels.attributes);
final Instant timeReceived = Instant.now();

for (final Types.Sample sample : standalone.timeSeries.getSamplesList()) {
final String timestamp = resolveTimestamp(sample.getTimestamp());

if (standalone.isCounter) {
final String counterName = stripCounterSuffix(standalone.labels.metricName);
final String counterName = PrometheusMetricUtils.stripCounterSuffix(standalone.labels.metricName);
records.add(new Record<>(JacksonSum.builder()
.withName(counterName)
.withTime(timestamp)
.withValue(sample.getValue())
.withAttributes(new HashMap<>(standalone.labels.attributes))
.withIsMonotonic(true)
.withAggregationTemporality("AGGREGATION_TEMPORALITY_CUMULATIVE")
.withAggregationTemporality(PrometheusMetricUtils.AGGREGATION_TEMPORALITY_CUMULATIVE)
.withServiceName(serviceName)
.withTimeReceived(timeReceived)
.build(config.isFlattenLabels())));
Expand All @@ -461,89 +458,18 @@ private List<Record<Event>> convertStandalone(final StandaloneTimeSeries standal
return records;
}

/**
* Extracts the service name from attributes using priority order:
* service.name > service_name > job > empty string.
*/
static String extractServiceName(final Map<String, Object> attributes) {
if (attributes.containsKey(SERVICE_NAME_LABEL)) {
return (String) attributes.get(SERVICE_NAME_LABEL);
}
if (attributes.containsKey(SERVICE_NAME_UNDERSCORE_LABEL)) {
return (String) attributes.get(SERVICE_NAME_UNDERSCORE_LABEL);
}
if (attributes.containsKey(JOB_LABEL)) {
return (String) attributes.get(JOB_LABEL);
}
return "";
}

/**
* Strips the {@code _total} or {@code _created} suffix from counter metric names.
*/
static String stripCounterSuffix(final String metricName) {
if (metricName.endsWith(TOTAL_SUFFIX)) {
return metricName.substring(0, metricName.length() - TOTAL_SUFFIX.length());
}
if (metricName.endsWith(CREATED_SUFFIX)) {
return metricName.substring(0, metricName.length() - CREATED_SUFFIX.length());
}
return metricName;
}

/**
* Infers whether a metric is a counter (Sum) based on its name suffix.
*
* @param metricName the metric name
* @return true if the metric is a counter
*/
static boolean isCounter(final String metricName) {
return metricName.endsWith(TOTAL_SUFFIX) || metricName.endsWith(CREATED_SUFFIX);
}

/**
* Resolves a timestamp, using current time if the value is 0.
*/
private static String resolveTimestamp(final long timestampMs) {
if (timestampMs == 0) {
return Instant.now().toString();
}
return Instant.ofEpochMilli(timestampMs).toString();
}

/**
* Parses an {@code le} label value to a Double. Returns null if unparseable.
*/
static Double parseLeValue(final String leValue) {
if (leValue == null) {
return null;
}
if ("+Inf".equals(leValue)) {
return Double.POSITIVE_INFINITY;
}
try {
return Double.parseDouble(leValue);
} catch (final NumberFormatException e) {
LOG.warn("Skipping histogram bucket with unparseable le value: '{}'", leValue);
return null;
}
}

/**
* Parses a {@code quantile} label value to a Double. Returns null if unparseable.
*/
static Double parseQuantileValue(final String quantileValue) {
if (quantileValue == null) {
return null;
}
try {
return Double.parseDouble(quantileValue);
} catch (final NumberFormatException e) {
LOG.warn("Skipping summary quantile with unparseable value: '{}'", quantileValue);
return null;
}
}

/**
* Gets the sample value as long at a given timestamp from a TimeSeries.
* Returns 0 if no sample exists at the requested timestamp.
Expand Down
Loading
Loading