Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -471,11 +473,8 @@ private Labels convertAttributes(
? filterAllowedResourceAttributeKeys(resource)
: Collections.emptyList();

Map<String, String> labelNameToValue = new HashMap<>();
attributes.forEach(
(key, value) ->
labelNameToValue.put(
convertLabelName(key.getKey()), toLabelValue(key.getType(), value)));
Map<String, String> labelNameToValue = new LinkedHashMap<>();
mergeNormalizedAttributeLabels(labelNameToValue, attributes);

for (int i = 0; i < additionalAttributes.length; i += 2) {
labelNameToValue.putIfAbsent(
Expand All @@ -501,13 +500,18 @@ private Labels convertAttributes(

if (resource != null) {
Attributes resourceAttributes = resource.getAttributes();
Map<String, List<LabelValue>> resourceLabelNameToValues = new LinkedHashMap<>();
for (AttributeKey attributeKey : allowedAttributeKeys) {
Object attributeValue = resourceAttributes.get(attributeKey);
if (attributeValue != null) {
labelNameToValue.putIfAbsent(
convertLabelName(attributeKey.getKey()), attributeValue.toString());
addNormalizedLabelValue(
resourceLabelNameToValues,
convertLabelName(attributeKey.getKey()),
attributeKey.getKey(),
attributeValue.toString());
}
}
joinCollidingLabelValues(resourceLabelNameToValues).forEach(labelNameToValue::putIfAbsent);
}

String[] names = new String[labelNameToValue.size()];
Expand All @@ -523,6 +527,42 @@ private Labels convertAttributes(
return Labels.of(names, values);
}

private static void mergeNormalizedAttributeLabels(
Map<String, String> labelNameToValue, Attributes attributes) {
Map<String, List<LabelValue>> labelNameToValues = new LinkedHashMap<>();
attributes.forEach(
(key, value) ->
addNormalizedLabelValue(
labelNameToValues,
convertLabelName(key.getKey()),
key.getKey(),
toLabelValue(key.getType(), value)));
labelNameToValue.putAll(joinCollidingLabelValues(labelNameToValues));
}

private static void addNormalizedLabelValue(
Map<String, List<LabelValue>> labelNameToValues,
String labelName,
String originalKey,
String value) {
labelNameToValues
.computeIfAbsent(labelName, ignored -> new ArrayList<>())
.add(new LabelValue(originalKey, value));
}

private static Map<String, String> joinCollidingLabelValues(
Map<String, List<LabelValue>> labelNameToValues) {
Map<String, String> joinedLabels = new LinkedHashMap<>();
labelNameToValues.forEach(
(labelName, labelValues) -> {
labelValues.sort(Comparator.comparing(LabelValue::originalKey));
joinedLabels.put(
labelName,
labelValues.stream().map(LabelValue::value).collect(Collectors.joining(";")));
});
return joinedLabels;
}

private List<AttributeKey<?>> filterAllowedResourceAttributeKeys(@Nullable Resource resource) {
requireNonNull(
allowedResourceAttributesFilter,
Expand Down Expand Up @@ -739,4 +779,22 @@ public static String toJsonValidStr(String str) {
sb.append('"');
return sb.toString();
}

private static final class LabelValue {
private final String originalKey;
private final String value;

private LabelValue(String originalKey, String value) {
this.originalKey = originalKey;
this.value = value;
}

private String originalKey() {
return originalKey;
}

private String value() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.common.KeyValue;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
Expand Down Expand Up @@ -48,9 +50,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -285,9 +290,37 @@ private static Stream<Arguments> resourceAttributesAdditionArgs() {
"my_metric_units",
"cluster=\"mycluster\",otel_scope_foo=\"bar\",otel_scope_name=\"scope\",otel_scope_schema_url=\"schemaUrl\",otel_scope_version=\"version\""));

arguments.add(
Arguments.of(
createSampleMetricData(
"my.metric",
"units",
MetricDataType.LONG_SUM,
Attributes.empty(),
Resource.create(
createMapAttributes("foo_bar", "b", "foo-bar", "c", "foo.bar", "a"))),
/* allowedResourceAttributesFilter= */ Predicates.startsWith("foo"),
"my_metric_units",
"foo_bar=\"c;a;b\",otel_scope_foo=\"bar\",otel_scope_name=\"scope\",otel_scope_schema_url=\"schemaUrl\",otel_scope_version=\"version\""));

return arguments.stream();
}

@Test
void metricAttributeCollisionsAreMergedAndSorted() {
Attributes attributes = createMapAttributes("foo_bar", "b", "foo-bar", "c", "foo.bar", "a");
MetricData metricData =
createSampleMetricData("sample", "1", MetricDataType.LONG_SUM, attributes, null);

MetricSnapshots snapshots = converter.convert(Collections.singletonList(metricData));

Optional<MetricSnapshot> metricSnapshot =
snapshots.stream().filter(snapshot -> snapshot instanceof CounterSnapshot).findFirst();
assertThat(metricSnapshot).isPresent();
Labels labels = metricSnapshot.get().getDataPoints().get(0).getLabels();
assertThat(labels.get("foo_bar")).isEqualTo("c;a;b");
}

@Test
void prometheusNameCollisionTest_Issue6277() {
// NOTE: Metrics with the same resolved prometheus name should merge. However,
Expand Down Expand Up @@ -555,4 +588,53 @@ void validateCacheIsBounded() {
// it never saw those resources before.
assertThat(predicateCalledCount.get()).isEqualTo(2);
}

private static Attributes createMapAttributes(String... keyValues) {
LinkedHashMap<AttributeKey<?>, Object> map = new LinkedHashMap<>();
for (int i = 0; i < keyValues.length; i += 2) {
map.put(stringKey(keyValues[i]), keyValues[i + 1]);
}
return new MapAttributes(map);
}

@SuppressWarnings("unchecked")
private static final class MapAttributes implements Attributes {

private final LinkedHashMap<AttributeKey<?>, Object> map;

private MapAttributes(Map<AttributeKey<?>, Object> map) {
this.map = new LinkedHashMap<>(map);
}

@Nullable
@Override
public <T> T get(AttributeKey<T> key) {
return (T) map.get(key);
}

@Override
public void forEach(BiConsumer<? super AttributeKey<?>, ? super Object> consumer) {
map.forEach(consumer);
}

@Override
public int size() {
return map.size();
}

@Override
public boolean isEmpty() {
return map.isEmpty();
}

@Override
public Map<AttributeKey<?>, Object> asMap() {
return map;
}

@Override
public AttributesBuilder toBuilder() {
throw new UnsupportedOperationException("not supported");
}
}
}
Loading