Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
value -> UTF8BytesString.create(key + ":" + value));
private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";

/**
* gRPC status code tag keys in priority order, matching the Go reference implementation. The
* first non-empty value wins.
*/
private static final String[] GRPC_STATUS_CODE_KEYS = {
InstrumentationTags.GRPC_STATUS_CODE, // rpc.grpc.status_code
InstrumentationTags.GRPC_CODE, // grpc.code
InstrumentationTags.RPC_GRPC_STATUS_CODE, // rpc.grpc.status.code
InstrumentationTags.GRPC_STATUS_CODE_LEGACY // grpc.status.code
};

private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS =
unmodifiableSet(
new HashSet<>(
Expand Down Expand Up @@ -331,8 +342,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
CharSequence spanType = span.getType();
String grpcStatusCode = null;
if (spanType != null && RPC.contentEquals(spanType)) {
Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE);
grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null;
grpcStatusCode = extractGrpcStatusCode(span);
}
MetricKey newKey =
new MetricKey(
Expand Down Expand Up @@ -381,6 +391,26 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
return span.getError() > 0;
}

/**
* Extracts gRPC status code from a span by checking tag keys in priority order: 1.
* rpc.grpc.status_code 2. grpc.code 3. rpc.grpc.status.code 4. grpc.status.code
*
* <p>Returns the string value of the first non-empty tag found, or null if none are present. This
* matches the Go reference implementation's extraction order.
*/
private static String extractGrpcStatusCode(CoreSpan<?> span) {
for (String key : GRPC_STATUS_CODE_KEYS) {
Object value = span.unsafeGetTag(key);
if (value != null) {
String strValue = value.toString();
if (!strValue.isEmpty()) {
return strValue;
}
}
}
return null;
}

private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
final Set<String> eligiblePeerTags = features.peerTags();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,58 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
aggregator.close()
}

def "should extract grpc status code using priority order: #description"() {
setup:
MetricWriter writer = Mock(MetricWriter)
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false)
aggregator.start()

when:
CountDownLatch latch = new CountDownLatch(1)
def span = new SimpleSpan("service", "grpc.server", "grpc.service/Method", "rpc", true, false, false, 0, 100, 0)
.setTag(SPAN_KIND, "server")
tags.each { k, v -> span.setTag(k, v) }
aggregator.publish([span])
aggregator.report()
def latchTriggered = latch.await(2, SECONDS)

then:
latchTriggered
1 * writer.startBucket(1, _, _)
1 * writer.add(new MetricKey(
"grpc.service/Method",
"service",
"grpc.server",
null,
"rpc",
0,
false,
false,
"server",
[],
null,
null,
expectedGrpcStatus
), _)
1 * writer.finishBucket() >> { latch.countDown() }

cleanup:
aggregator.close()

where:
description | tags | expectedGrpcStatus
"rpc.grpc.status_code is highest priority" | [(InstrumentationTags.GRPC_STATUS_CODE): 0, (InstrumentationTags.GRPC_CODE): "OK", (InstrumentationTags.RPC_GRPC_STATUS_CODE): "0", (InstrumentationTags.GRPC_STATUS_CODE_LEGACY): "OK"] | "0"
"grpc.code used when rpc.grpc.status_code absent" | [(InstrumentationTags.GRPC_CODE): "NOT_FOUND", (InstrumentationTags.RPC_GRPC_STATUS_CODE): "5", (InstrumentationTags.GRPC_STATUS_CODE_LEGACY): "NOT_FOUND"] | "NOT_FOUND"
"rpc.grpc.status.code used when higher priority absent" | [(InstrumentationTags.RPC_GRPC_STATUS_CODE): "14", (InstrumentationTags.GRPC_STATUS_CODE_LEGACY): "UNAVAILABLE"] | "14"
"grpc.status.code used as last fallback" | [(InstrumentationTags.GRPC_STATUS_CODE_LEGACY): "CANCELLED"] | "CANCELLED"
"no grpc status when no tags present" | [:] | null
}

def reportAndWaitUntilEmpty(ConflatingMetricsAggregator aggregator) {
waitUntilEmpty(aggregator)
aggregator.report()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public class InstrumentationTags {
public static final String STATUS_CODE = "status.code";
public static final String STATUS_DESCRIPTION = "status.description";
public static final String GRPC_STATUS_CODE = "rpc.grpc.status_code";
public static final String GRPC_CODE = "grpc.code";
public static final String RPC_GRPC_STATUS_CODE = "rpc.grpc.status.code";
public static final String GRPC_STATUS_CODE_LEGACY = "grpc.status.code";
public static final String MESSAGE_TYPE = "message.type";
public static final String MESSAGE_SIZE = "message.size";
public static final String HYSTRIX_COMMAND = "hystrix.command";
Expand Down
Loading