Skip to content

Commit cdfa917

Browse files
committed
Capture gRPC UNKNOWN requests
1 parent def88cf commit cdfa917

11 files changed

Lines changed: 216 additions & 7 deletions

File tree

instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcServerBuilderInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public static CallDepth onEnter(@Advice.This ServerBuilder<?> serverBuilder) {
5151
}
5252
if (!Boolean.TRUE.equals(SERVER_BUILDER_INSTRUMENTED.get(serverBuilder))) {
5353
serverBuilder.intercept(GrpcSingletons.SERVER_INTERCEPTOR);
54+
serverBuilder.addStreamTracerFactory(GrpcSingletons.SERVER_STREAM_TRACER_FACTORY);
5455
SERVER_BUILDER_INSTRUMENTED.set(serverBuilder, true);
5556
}
5657
return callDepth;

instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.grpc.ManagedChannelBuilder;
1313
import io.grpc.ServerBuilder;
1414
import io.grpc.ServerInterceptor;
15+
import io.grpc.ServerStreamTracer;
1516
import io.opentelemetry.api.GlobalOpenTelemetry;
1617
import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties;
1718
import io.opentelemetry.instrumentation.api.incubator.config.internal.DeclarativeConfigUtil;
@@ -35,6 +36,8 @@ public final class GrpcSingletons {
3536

3637
public static final ServerInterceptor SERVER_INTERCEPTOR;
3738

39+
public static final ServerStreamTracer.Factory SERVER_STREAM_TRACER_FACTORY;
40+
3841
private static final AtomicReference<Context.Storage> STORAGE_REFERENCE = new AtomicReference<>();
3942

4043
static {
@@ -66,6 +69,7 @@ public final class GrpcSingletons {
6669

6770
CLIENT_INTERCEPTOR = telemetry.createClientInterceptor();
6871
SERVER_INTERCEPTOR = telemetry.createServerInterceptor();
72+
SERVER_STREAM_TRACER_FACTORY = telemetry.createServerStreamTracerFactory();
6973
}
7074

7175
public static Context.Storage getStorage() {

instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRequest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212

1313
public final class GrpcRequest {
1414

15-
private final MethodDescriptor<?, ?> method;
15+
@Nullable private volatile MethodDescriptor<?, ?> method;
16+
private volatile String fullMethodName;
1617

1718
@Nullable private volatile Metadata metadata;
1819

@@ -29,11 +30,17 @@ public final class GrpcRequest {
2930
@Nullable SocketAddress peerSocketAddress,
3031
@Nullable String authority) {
3132
this.method = method;
33+
this.fullMethodName = method.getFullMethodName();
3234
this.metadata = metadata;
3335
this.peerSocketAddress = peerSocketAddress;
3436
setLogicalAddress(authority);
3537
}
3638

39+
GrpcRequest(String fullMethodName, @Nullable Metadata metadata) {
40+
this.fullMethodName = fullMethodName;
41+
this.metadata = metadata;
42+
}
43+
3744
private void setLogicalAddress(@Nullable String authority) {
3845
if (authority == null) {
3946
return;
@@ -51,10 +58,15 @@ private void setLogicalAddress(@Nullable String authority) {
5158
}
5259
}
5360

61+
@Nullable
5462
public MethodDescriptor<?, ?> getMethod() {
5563
return method;
5664
}
5765

66+
String getFullMethodName() {
67+
return fullMethodName;
68+
}
69+
5870
@Nullable
5971
public Metadata getMetadata() {
6072
return metadata;

instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRpcAttributesGetter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public String getSystem(GrpcRequest request) {
2525
@Override
2626
@Nullable
2727
public String getService(GrpcRequest request) {
28-
String fullMethodName = request.getMethod().getFullMethodName();
28+
String fullMethodName = request.getFullMethodName();
2929
int slashIndex = fullMethodName.lastIndexOf('/');
3030
if (slashIndex == -1) {
3131
return null;
@@ -37,7 +37,7 @@ public String getService(GrpcRequest request) {
3737
@Override
3838
@Nullable
3939
public String getMethod(GrpcRequest request) {
40-
String fullMethodName = request.getMethod().getFullMethodName();
40+
String fullMethodName = request.getFullMethodName();
4141
int slashIndex = fullMethodName.lastIndexOf('/');
4242
if (slashIndex == -1) {
4343
return null;

instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcSpanNameExtractor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@
1111
final class GrpcSpanNameExtractor implements SpanNameExtractor<GrpcRequest> {
1212
@Override
1313
public String extract(GrpcRequest request) {
14-
return request.getMethod().getFullMethodName();
14+
return request.getFullMethodName();
1515
}
1616
}

instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTelemetry.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import io.grpc.ClientInterceptor;
99
import io.grpc.ServerInterceptor;
10+
import io.grpc.ServerStreamTracer;
1011
import io.grpc.Status;
1112
import io.opentelemetry.api.OpenTelemetry;
1213
import io.opentelemetry.context.propagation.ContextPropagators;
@@ -61,4 +62,14 @@ public ServerInterceptor createServerInterceptor() {
6162
return new TracingServerInterceptor(
6263
serverInstrumenter, captureExperimentalSpanAttributes, emitMessageEvents);
6364
}
65+
66+
/**
67+
* Returns a new {@link ServerStreamTracer.Factory} for use with methods like {@link
68+
* io.grpc.ServerBuilder#addStreamTracerFactory(ServerStreamTracer.Factory)}. This enables
69+
* creating server spans for requests to unregistered services, which are not seen by server
70+
* interceptors. Should be used together with {@link #createServerInterceptor()}.
71+
*/
72+
public ServerStreamTracer.Factory createServerStreamTracerFactory() {
73+
return new TracingServerStreamTracerFactory(serverInstrumenter, propagators);
74+
}
6475
}

instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingServerInterceptor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(
7070
// field.
7171
authority = GrpcAuthorityStorage.getAuthority(call);
7272
}
73+
74+
// If a ServerStreamTracer is active, mark it as handled so it won't create a span for this
75+
// request in streamClosed().
76+
TracingServerStreamTracer streamTracer = TracingServerStreamTracer.STREAM_TRACER_KEY.get();
77+
if (streamTracer != null) {
78+
streamTracer.markInterceptorHandled();
79+
}
80+
7381
GrpcRequest request =
7482
new GrpcRequest(
7583
call.getMethodDescriptor(),
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.grpc.v1_6;
7+
8+
import io.grpc.Grpc;
9+
import io.grpc.Metadata;
10+
import io.grpc.ServerCall;
11+
import io.grpc.ServerStreamTracer;
12+
import io.grpc.Status;
13+
import io.opentelemetry.context.Context;
14+
import io.opentelemetry.context.propagation.ContextPropagators;
15+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
16+
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
17+
import java.net.SocketAddress;
18+
import java.time.Instant;
19+
import javax.annotation.Nullable;
20+
21+
/**
22+
* A {@link ServerStreamTracer} that detects whether a gRPC server request was handled by the {@link
23+
* TracingServerInterceptor}. If the interceptor does not fire (unregistered method), {@link
24+
* #streamClosed(Status)} creates a span for the unhandled request.
25+
*/
26+
final class TracingServerStreamTracer extends ServerStreamTracer {
27+
28+
static final io.grpc.Context.Key<TracingServerStreamTracer> STREAM_TRACER_KEY =
29+
io.grpc.Context.key("otel-grpc-stream-tracer");
30+
31+
private static final String UNKNOWN_METHOD_SPAN_NAME = "_OTHER";
32+
33+
private final Instrumenter<GrpcRequest, Status> instrumenter;
34+
private final ContextPropagators propagators;
35+
private final Metadata headers;
36+
private final Context parentContext;
37+
private final Instant startTime;
38+
39+
private volatile boolean interceptorHandled;
40+
@Nullable private volatile SocketAddress peerAddress;
41+
42+
TracingServerStreamTracer(
43+
Instrumenter<GrpcRequest, Status> instrumenter,
44+
ContextPropagators propagators,
45+
Metadata headers,
46+
Context parentContext) {
47+
this.instrumenter = instrumenter;
48+
this.propagators = propagators;
49+
this.headers = headers;
50+
this.parentContext = parentContext;
51+
this.startTime = Instant.now();
52+
}
53+
54+
void markInterceptorHandled() {
55+
interceptorHandled = true;
56+
}
57+
58+
@Override
59+
public io.grpc.Context filterContext(io.grpc.Context context) {
60+
return context.withValue(STREAM_TRACER_KEY, this);
61+
}
62+
63+
@Override
64+
public void serverCallStarted(ServerCall<?, ?> call) {
65+
if (peerAddress == null) {
66+
SocketAddress addr = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
67+
if (addr != null) {
68+
peerAddress = addr;
69+
}
70+
}
71+
}
72+
73+
@Override
74+
public void streamClosed(Status status) {
75+
if (interceptorHandled) {
76+
return;
77+
}
78+
// Interceptor did not fire — this is an unregistered method
79+
GrpcRequest request = new GrpcRequest(UNKNOWN_METHOD_SPAN_NAME, headers);
80+
if (peerAddress != null) {
81+
request.setPeerSocketAddress(peerAddress);
82+
}
83+
// Extract trace context from incoming headers (e.g. W3C traceparent)
84+
Context extracted =
85+
propagators
86+
.getTextMapPropagator()
87+
.extract(parentContext, request, GrpcRequestGetter.INSTANCE);
88+
if (instrumenter.shouldStart(extracted, request)) {
89+
InstrumenterUtil.startAndEnd(
90+
instrumenter, extracted, request, status, status.getCause(), startTime, Instant.now());
91+
}
92+
}
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.grpc.v1_6;
7+
8+
import io.grpc.Metadata;
9+
import io.grpc.ServerStreamTracer;
10+
import io.grpc.Status;
11+
import io.opentelemetry.context.Context;
12+
import io.opentelemetry.context.propagation.ContextPropagators;
13+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
14+
15+
final class TracingServerStreamTracerFactory extends ServerStreamTracer.Factory {
16+
17+
private final Instrumenter<GrpcRequest, Status> instrumenter;
18+
private final ContextPropagators propagators;
19+
20+
TracingServerStreamTracerFactory(
21+
Instrumenter<GrpcRequest, Status> instrumenter, ContextPropagators propagators) {
22+
this.instrumenter = instrumenter;
23+
this.propagators = propagators;
24+
}
25+
26+
@Override
27+
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
28+
return new TracingServerStreamTracer(instrumenter, propagators, headers, Context.current());
29+
}
30+
}

instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@ class GrpcTest extends AbstractGrpcTest {
4444

4545
@Override
4646
protected ServerBuilder<?> configureServer(ServerBuilder<?> server) {
47-
return server.intercept(
47+
GrpcTelemetry telemetry =
4848
GrpcTelemetry.builder(testing.getOpenTelemetry())
4949
.setCapturedServerRequestMetadata(
5050
Collections.singletonList(SERVER_REQUEST_METADATA_KEY))
51-
.build()
52-
.createServerInterceptor());
51+
.build();
52+
return server
53+
.intercept(telemetry.createServerInterceptor())
54+
.addStreamTracerFactory(telemetry.createServerStreamTracerFactory());
5355
}
5456

5557
@Override

0 commit comments

Comments
 (0)