Skip to content

Commit b39ed92

Browse files
committed
fix(ccs): Reports gRPC status code in CCS
1 parent 4d0bac2 commit b39ed92

File tree

7 files changed

+212
-29
lines changed

7 files changed

+212
-29
lines changed

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package datadog.trace.common.metrics;
22

33
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT;
4+
import static datadog.trace.api.DDSpanTypes.RPC;
45
import static datadog.trace.api.DDTags.BASE_SERVICE;
56
import static datadog.trace.api.Functions.UTF8_ENCODE;
67
import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT;
@@ -79,6 +80,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
7980
DDCaches.newFixedSizeCache(512),
8081
value -> UTF8BytesString.create(key + ":" + value));
8182
private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";
83+
private static final String GRPC_STATUS_TAG = "grpc.status.code";
8284

8385
private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS =
8486
unmodifiableSet(
@@ -326,21 +328,28 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
326328
httpEndpoint = httpEndpointObj != null ? httpEndpointObj.toString() : null;
327329
}
328330

331+
CharSequence spanType = span.getType();
332+
String grpcStatusCode = null;
333+
if (spanType != null && RPC.contentEquals(spanType)) {
334+
Object grpcStatusObj = span.unsafeGetTag(GRPC_STATUS_TAG);
335+
grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null;
336+
}
329337
MetricKey newKey =
330338
new MetricKey(
331339
span.getResourceName(),
332340
SERVICE_NAMES.computeIfAbsent(span.getServiceName(), UTF8_ENCODE),
333341
span.getOperationName(),
334342
span.getServiceNameSource(),
335-
span.getType(),
343+
spanType,
336344
span.getHttpStatusCode(),
337345
isSynthetic(span),
338346
span.getParentId() == 0,
339347
SPAN_KINDS.computeIfAbsent(
340348
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
341349
getPeerTags(span, spanKind.toString()),
342350
httpMethod,
343-
httpEndpoint);
351+
httpEndpoint,
352+
grpcStatusCode);
344353
MetricKey key = keys.putIfAbsent(newKey, newKey);
345354
if (null == key) {
346355
key = newKey;

dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public final class MetricKey {
2323
private final List<UTF8BytesString> peerTags;
2424
private final UTF8BytesString httpMethod;
2525
private final UTF8BytesString httpEndpoint;
26+
private final UTF8BytesString grpcStatusCode;
2627

2728
public MetricKey(
2829
CharSequence resource,
@@ -36,7 +37,8 @@ public MetricKey(
3637
CharSequence spanKind,
3738
List<UTF8BytesString> peerTags,
3839
CharSequence httpMethod,
39-
CharSequence httpEndpoint) {
40+
CharSequence httpEndpoint,
41+
CharSequence grpcStatusCode) {
4042
this.resource = null == resource ? EMPTY : UTF8BytesString.create(resource);
4143
this.service = null == service ? EMPTY : UTF8BytesString.create(service);
4244
this.serviceSource = null == serviceSource ? null : UTF8BytesString.create(serviceSource);
@@ -49,6 +51,7 @@ public MetricKey(
4951
this.peerTags = peerTags == null ? Collections.emptyList() : peerTags;
5052
this.httpMethod = httpMethod == null ? null : UTF8BytesString.create(httpMethod);
5153
this.httpEndpoint = httpEndpoint == null ? null : UTF8BytesString.create(httpEndpoint);
54+
this.grpcStatusCode = grpcStatusCode == null ? null : UTF8BytesString.create(grpcStatusCode);
5255

5356
int tmpHash = 0;
5457
tmpHash = HashingUtils.addToHash(tmpHash, this.isTraceRoot);
@@ -63,6 +66,7 @@ public MetricKey(
6366
tmpHash = HashingUtils.addToHash(tmpHash, this.serviceSource);
6467
tmpHash = HashingUtils.addToHash(tmpHash, this.httpEndpoint);
6568
tmpHash = HashingUtils.addToHash(tmpHash, this.httpMethod);
69+
tmpHash = HashingUtils.addToHash(tmpHash, this.grpcStatusCode);
6670
this.hash = tmpHash;
6771
}
6872

@@ -114,6 +118,10 @@ public UTF8BytesString getHttpEndpoint() {
114118
return httpEndpoint;
115119
}
116120

121+
public UTF8BytesString getGrpcStatusCode() {
122+
return grpcStatusCode;
123+
}
124+
117125
@Override
118126
public boolean equals(Object o) {
119127
if (this == o) {
@@ -133,7 +141,8 @@ public boolean equals(Object o) {
133141
&& peerTags.equals(metricKey.peerTags)
134142
&& Objects.equals(serviceSource, metricKey.serviceSource)
135143
&& Objects.equals(httpMethod, metricKey.httpMethod)
136-
&& Objects.equals(httpEndpoint, metricKey.httpEndpoint);
144+
&& Objects.equals(httpEndpoint, metricKey.httpEndpoint)
145+
&& Objects.equals(grpcStatusCode, metricKey.grpcStatusCode);
137146
}
138147
return false;
139148
}

dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public final class SerializingMetricWriter implements MetricWriter {
3737
private static final byte[] PEER_TAGS = "PeerTags".getBytes(ISO_8859_1);
3838
private static final byte[] HTTP_METHOD = "HTTPMethod".getBytes(ISO_8859_1);
3939
private static final byte[] HTTP_ENDPOINT = "HTTPEndpoint".getBytes(ISO_8859_1);
40+
private static final byte[] GRPC_STATUS_CODE = "GRPCStatusCode".getBytes(ISO_8859_1);
4041
private static final byte[] SERVICE_SOURCE = "srv_src".getBytes(ISO_8859_1);
4142

4243
// Constant declared here for compile-time folding
@@ -111,8 +112,13 @@ public void add(MetricKey key, AggregateMetric aggregate) {
111112
final boolean hasHttpMethod = key.getHttpMethod() != null;
112113
final boolean hasHttpEndpoint = key.getHttpEndpoint() != null;
113114
final boolean hasServiceSource = key.getServiceSource() != null;
115+
final boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null;
114116
final int mapSize =
115-
15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0);
117+
15
118+
+ (hasServiceSource ? 1 : 0)
119+
+ (hasHttpMethod ? 1 : 0)
120+
+ (hasHttpEndpoint ? 1 : 0)
121+
+ (hasGrpcStatusCode ? 1 : 0);
116122

117123
writer.startMap(mapSize);
118124

@@ -164,6 +170,12 @@ public void add(MetricKey key, AggregateMetric aggregate) {
164170
writer.writeUTF8(key.getHttpEndpoint());
165171
}
166172

173+
// Only include GRPCStatusCode if present (rpc-type spans)
174+
if (hasGrpcStatusCode) {
175+
writer.writeUTF8(GRPC_STATUS_CODE);
176+
writer.writeUTF8(key.getGrpcStatusCode());
177+
}
178+
167179
writer.writeUTF8(HITS);
168180
writer.writeInt(aggregate.getHitCount());
169181

dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class AggregateMetricTest extends DDSpecification {
6565
given:
6666
AggregateMetric aggregate = new AggregateMetric().recordDurations(3, new AtomicLongArray(0L, 0L, 0L | ERROR_TAG | TOP_LEVEL_TAG))
6767

68-
Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null))
68+
Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null))
6969
batch.add(0L, 10)
7070
batch.add(0L, 10)
7171
batch.add(0L, 10)
@@ -140,7 +140,7 @@ class AggregateMetricTest extends DDSpecification {
140140
def "consistent under concurrent attempts to read and write"() {
141141
given:
142142
AggregateMetric aggregate = new AggregateMetric()
143-
MetricKey key = new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null)
143+
MetricKey key = new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null)
144144
BlockingDeque<Batch> queue = new LinkedBlockingDeque<>(1000)
145145
ExecutorService reader = Executors.newSingleThreadExecutor()
146146
int writerCount = 10

0 commit comments

Comments
 (0)