Skip to content

Commit 0248e6f

Browse files
authored
core,opentelemetry: Fix server metric labels on early close (#12774)
This addresses the server-side OpenTelemetry metric labeling bug from #12117 where a generated method can be recorded as `grpc.method="other"` if `streamClosed()` happens before `serverCallStarted()`. ### What changed - add an internal `StatsTraceContext.ServerCallMethodListener` hook so tracers can consume an already-resolved primary-registry `MethodDescriptor` - resolve the immutable internal primary registry on the transport path and seed method classification before the async `MethodLookup` path runs - keep fallback registry lookup on the existing async path - update the OpenTelemetry server tracer to use the early-resolved method classification for close metrics ### Why this shape - avoids tracer-side `HandlerRegistry` lookup - uses only the immutable internal primary registry for early transport-path lookup - keeps fallback registry lookup on the existing async path ### Tests - primary generated method: early close preserves the generated method name - primary non-generated method: early close still records `other` - fallback generated method: fallback lookup remains on the existing async path and does not introduce early transport-path classification - tracer-level regression: `serverCallMethodResolved()` + `streamClosed()` records the generated method name without waiting for `serverCallStarted()` ### Notes - `ServerCallMethodListener` is an internal hook that carries the resolved `MethodDescriptor`; tracers consume the resolved result instead of performing registry lookup themselves - `ServerImpl` uses `InternalHandlerRegistry` explicitly for the primary registry to make it clear that the early transport- path lookup is limited to the immutable internal primary registry - this PR intentionally does not widen transport-path lookup to the fallback registry Ref #12117
1 parent 324fce7 commit 0248e6f

5 files changed

Lines changed: 405 additions & 9 deletions

File tree

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
9999
private final ObjectPool<? extends Executor> executorPool;
100100
/** Executor for application processing. Safe to read after {@link #start()}. */
101101
private Executor executor;
102-
private final HandlerRegistry registry;
102+
private final InternalHandlerRegistry registry;
103103
private final HandlerRegistry fallbackRegistry;
104104
private final List<ServerTransportFilter> transportFilters;
105105
// This is iterated on a per-call basis. Use an array instead of a Collection to avoid iterator
@@ -498,8 +498,12 @@ private void streamCreatedInternal(
498498

499499
final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull(
500500
stream.statsTraceContext(), "statsTraceCtx not present from stream");
501+
final ServerMethodDefinition<?, ?> primaryMethod = registry.lookupMethod(methodName, null);
501502

502503
final Context.CancellableContext context = createContext(headers, statsTraceCtx);
504+
if (primaryMethod != null) {
505+
statsTraceCtx.serverCallMethodResolved(primaryMethod.getMethodDescriptor());
506+
}
503507

504508
final Link link = PerfMark.linkOut();
505509

@@ -536,7 +540,7 @@ private void runInternal() {
536540
ServerMethodDefinition<?, ?> wrapMethod;
537541
ServerCallParameters<?, ?> callParams;
538542
try {
539-
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
543+
ServerMethodDefinition<?, ?> method = primaryMethod;
540544
if (method == null) {
541545
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
542546
}

core/src/main/java/io/grpc/internal/StatsTraceContext.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.grpc.ClientStreamTracer;
2424
import io.grpc.Context;
2525
import io.grpc.Metadata;
26+
import io.grpc.MethodDescriptor;
2627
import io.grpc.ServerStreamTracer;
2728
import io.grpc.ServerStreamTracer.ServerCallInfo;
2829
import io.grpc.Status;
@@ -38,6 +39,14 @@
3839
*/
3940
@ThreadSafe
4041
public final class StatsTraceContext {
42+
/**
43+
* Internal hook for server tracers that can use the resolved method descriptor before
44+
* {@link ServerStreamTracer#serverCallStarted(ServerCallInfo)} runs.
45+
*/
46+
public interface ServerCallMethodListener {
47+
void serverCallMethodResolved(MethodDescriptor<?, ?> method);
48+
}
49+
4150
public static final StatsTraceContext NOOP = new StatsTraceContext(new StreamTracer[0]);
4251

4352
private final StreamTracer[] tracers;
@@ -144,6 +153,20 @@ public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
144153
}
145154
}
146155

156+
/**
157+
* Notifies server tracers that a primary-registry method descriptor was resolved before
158+
* {@link ServerStreamTracer#serverCallStarted(ServerCallInfo)}.
159+
*
160+
* <p>Called from {@link io.grpc.internal.ServerImpl}.
161+
*/
162+
public void serverCallMethodResolved(MethodDescriptor<?, ?> method) {
163+
for (StreamTracer tracer : tracers) {
164+
if (tracer instanceof ServerCallMethodListener) {
165+
((ServerCallMethodListener) tracer).serverCallMethodResolved(method);
166+
}
167+
}
168+
}
169+
147170
/**
148171
* See {@link StreamTracer#streamClosed}. This may be called multiple times, and only the first
149172
* value will be taken.

core/src/test/java/io/grpc/internal/ServerImplTest.java

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ public class ServerImplTest {
129129
.setRequestMarshaller(STRING_MARSHALLER)
130130
.setResponseMarshaller(INTEGER_MARSHALLER)
131131
.build();
132+
private static final MethodDescriptor<String, Integer> GENERATED_METHOD =
133+
METHOD.toBuilder()
134+
.setSampledToLocalTracing(true)
135+
.build();
132136
private static final Context.Key<String> SERVER_ONLY = Context.key("serverOnly");
133137
private static final Context.Key<String> SERVER_TRACER_ADDED_KEY = Context.key("tracer-added");
134138
private static final Context.CancellableContext SERVER_CONTEXT =
@@ -142,6 +146,60 @@ public boolean shouldAccept(Runnable runnable) {
142146
};
143147
private static final String AUTHORITY = "some_authority";
144148

149+
private static final class MethodNameCapturingTracer extends ServerStreamTracer
150+
implements StatsTraceContext.ServerCallMethodListener {
151+
@Nullable private ServerCallInfo<?, ?> serverCallInfo;
152+
@Nullable private String recordedMethodName;
153+
@Nullable private String resolvedMethodName;
154+
private boolean streamClosed;
155+
156+
@Override
157+
public synchronized void serverCallMethodResolved(MethodDescriptor<?, ?> method) {
158+
resolvedMethodName =
159+
recordMethodName(method.isSampledToLocalTracing(), method.getFullMethodName());
160+
}
161+
162+
@Override
163+
public synchronized void streamClosed(Status status) {
164+
streamClosed = true;
165+
if (serverCallInfo != null) {
166+
recordedMethodName =
167+
recordMethodName(
168+
serverCallInfo.getMethodDescriptor().isSampledToLocalTracing(),
169+
serverCallInfo.getMethodDescriptor().getFullMethodName());
170+
} else if (resolvedMethodName != null) {
171+
recordedMethodName = resolvedMethodName;
172+
} else {
173+
recordedMethodName = "other";
174+
}
175+
}
176+
177+
@Override
178+
public synchronized void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
179+
serverCallInfo = callInfo;
180+
if (streamClosed) {
181+
recordedMethodName =
182+
recordMethodName(
183+
callInfo.getMethodDescriptor().isSampledToLocalTracing(),
184+
callInfo.getMethodDescriptor().getFullMethodName());
185+
}
186+
}
187+
188+
@Nullable
189+
synchronized ServerCallInfo<?, ?> getServerCallInfo() {
190+
return serverCallInfo;
191+
}
192+
193+
@Nullable
194+
synchronized String getRecordedMethodName() {
195+
return recordedMethodName;
196+
}
197+
198+
private static String recordMethodName(boolean generatedMethod, String fullMethodName) {
199+
return generatedMethod ? fullMethodName : "other";
200+
}
201+
}
202+
145203
@Rule public final MockitoRule mocks = MockitoJUnit.rule();
146204

147205
@BeforeClass
@@ -462,6 +520,172 @@ public void methodNotFound() throws Exception {
462520
assertEquals(Status.Code.UNIMPLEMENTED, statusCaptor.getValue().getCode());
463521
}
464522

523+
@Test
524+
public void primaryRegistryGeneratedMethod_streamClosedBeforeStart_preservesMethodName()
525+
throws Exception {
526+
MethodNameCapturingTracer methodNameTracer = new MethodNameCapturingTracer();
527+
streamTracerFactories =
528+
Collections.singletonList(
529+
new ServerStreamTracer.Factory() {
530+
@Override
531+
public ServerStreamTracer newServerStreamTracer(
532+
String fullMethodName, Metadata headers) {
533+
return methodNameTracer;
534+
}
535+
});
536+
builder.addService(
537+
ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", GENERATED_METHOD))
538+
.addMethod(
539+
GENERATED_METHOD,
540+
new ServerCallHandler<String, Integer>() {
541+
@Override
542+
public ServerCall.Listener<String> startCall(
543+
ServerCall<String, Integer> call, Metadata headers) {
544+
return callListener;
545+
}
546+
})
547+
.build());
548+
549+
createAndStartServer();
550+
ServerTransportListener transportListener
551+
= transportServer.registerNewServerTransport(new SimpleServerTransport());
552+
transportListener.transportReady(Attributes.EMPTY);
553+
Metadata requestHeaders = new Metadata();
554+
StatsTraceContext statsTraceCtx =
555+
StatsTraceContext.newServerContext(
556+
streamTracerFactories, GENERATED_METHOD.getFullMethodName(), requestHeaders);
557+
when(stream.getAttributes()).thenReturn(Attributes.EMPTY);
558+
when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
559+
560+
transportListener.streamCreated(stream, GENERATED_METHOD.getFullMethodName(), requestHeaders);
561+
verify(stream).setListener(isA(ServerStreamListener.class));
562+
verify(stream, atLeast(1)).statsTraceContext();
563+
564+
statsTraceCtx.streamClosed(Status.CANCELLED);
565+
assertNull(methodNameTracer.getServerCallInfo());
566+
assertEquals(
567+
GENERATED_METHOD.getFullMethodName(),
568+
methodNameTracer.getRecordedMethodName());
569+
570+
assertEquals(1, executor.runDueTasks());
571+
572+
assertNotNull(methodNameTracer.getServerCallInfo());
573+
assertSame(GENERATED_METHOD, methodNameTracer.getServerCallInfo().getMethodDescriptor());
574+
assertEquals(
575+
GENERATED_METHOD.getFullMethodName(),
576+
methodNameTracer.getRecordedMethodName());
577+
verify(fallbackRegistry, never()).lookupMethod(anyString(), any());
578+
}
579+
580+
@Test
581+
public void primaryRegistryNonGeneratedMethod_streamClosedBeforeStart_recordsOther()
582+
throws Exception {
583+
MethodNameCapturingTracer methodNameTracer = new MethodNameCapturingTracer();
584+
streamTracerFactories =
585+
Collections.singletonList(
586+
new ServerStreamTracer.Factory() {
587+
@Override
588+
public ServerStreamTracer newServerStreamTracer(
589+
String fullMethodName, Metadata headers) {
590+
return methodNameTracer;
591+
}
592+
});
593+
builder.addService(
594+
ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", METHOD))
595+
.addMethod(
596+
METHOD,
597+
new ServerCallHandler<String, Integer>() {
598+
@Override
599+
public ServerCall.Listener<String> startCall(
600+
ServerCall<String, Integer> call, Metadata headers) {
601+
return callListener;
602+
}
603+
})
604+
.build());
605+
606+
createAndStartServer();
607+
ServerTransportListener transportListener
608+
= transportServer.registerNewServerTransport(new SimpleServerTransport());
609+
transportListener.transportReady(Attributes.EMPTY);
610+
Metadata requestHeaders = new Metadata();
611+
StatsTraceContext statsTraceCtx =
612+
StatsTraceContext.newServerContext(
613+
streamTracerFactories, METHOD.getFullMethodName(), requestHeaders);
614+
when(stream.getAttributes()).thenReturn(Attributes.EMPTY);
615+
when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
616+
617+
transportListener.streamCreated(stream, METHOD.getFullMethodName(), requestHeaders);
618+
verify(stream).setListener(isA(ServerStreamListener.class));
619+
verify(stream, atLeast(1)).statsTraceContext();
620+
621+
statsTraceCtx.streamClosed(Status.CANCELLED);
622+
assertNull(methodNameTracer.getServerCallInfo());
623+
assertEquals("other", methodNameTracer.getRecordedMethodName());
624+
625+
assertEquals(1, executor.runDueTasks());
626+
627+
assertNotNull(methodNameTracer.getServerCallInfo());
628+
assertSame(METHOD, methodNameTracer.getServerCallInfo().getMethodDescriptor());
629+
assertEquals("other", methodNameTracer.getRecordedMethodName());
630+
verify(fallbackRegistry, never()).lookupMethod(anyString(), any());
631+
}
632+
633+
@Test
634+
public void fallbackRegistryGeneratedMethod_streamClosedBeforeStart_resolvesOnAsyncLookup()
635+
throws Exception {
636+
MethodNameCapturingTracer methodNameTracer = new MethodNameCapturingTracer();
637+
streamTracerFactories =
638+
Collections.singletonList(
639+
new ServerStreamTracer.Factory() {
640+
@Override
641+
public ServerStreamTracer newServerStreamTracer(
642+
String fullMethodName, Metadata headers) {
643+
return methodNameTracer;
644+
}
645+
});
646+
mutableFallbackRegistry.addService(
647+
ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", GENERATED_METHOD))
648+
.addMethod(
649+
GENERATED_METHOD,
650+
new ServerCallHandler<String, Integer>() {
651+
@Override
652+
public ServerCall.Listener<String> startCall(
653+
ServerCall<String, Integer> call, Metadata headers) {
654+
return callListener;
655+
}
656+
})
657+
.build());
658+
659+
createAndStartServer();
660+
ServerTransportListener transportListener
661+
= transportServer.registerNewServerTransport(new SimpleServerTransport());
662+
transportListener.transportReady(Attributes.EMPTY);
663+
Metadata requestHeaders = new Metadata();
664+
StatsTraceContext statsTraceCtx =
665+
StatsTraceContext.newServerContext(
666+
streamTracerFactories, GENERATED_METHOD.getFullMethodName(), requestHeaders);
667+
when(stream.getAttributes()).thenReturn(Attributes.EMPTY);
668+
when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
669+
670+
transportListener.streamCreated(stream, GENERATED_METHOD.getFullMethodName(), requestHeaders);
671+
verify(stream).setListener(isA(ServerStreamListener.class));
672+
verify(stream, atLeast(1)).statsTraceContext();
673+
674+
statsTraceCtx.streamClosed(Status.CANCELLED);
675+
assertNull(methodNameTracer.getServerCallInfo());
676+
assertEquals("other", methodNameTracer.getRecordedMethodName());
677+
verify(fallbackRegistry, never()).lookupMethod(anyString(), any());
678+
679+
assertEquals(1, executor.runDueTasks());
680+
681+
assertNotNull(methodNameTracer.getServerCallInfo());
682+
assertSame(GENERATED_METHOD, methodNameTracer.getServerCallInfo().getMethodDescriptor());
683+
assertEquals(
684+
GENERATED_METHOD.getFullMethodName(),
685+
methodNameTracer.getRecordedMethodName());
686+
verify(fallbackRegistry).lookupMethod(GENERATED_METHOD.getFullMethodName(), AUTHORITY);
687+
}
688+
465689

466690
@Test
467691
public void executorSupplierSameExecutorBasic() throws Exception {

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.grpc.Status;
4848
import io.grpc.Status.Code;
4949
import io.grpc.StreamTracer;
50+
import io.grpc.internal.StatsTraceContext.ServerCallMethodListener;
5051
import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter;
5152
import io.opentelemetry.api.baggage.Baggage;
5253
import io.opentelemetry.api.common.AttributesBuilder;
@@ -526,7 +527,8 @@ void recordFinishedCall(CallOptions callOptions) {
526527
}
527528
}
528529

529-
private static final class ServerTracer extends ServerStreamTracer {
530+
private static final class ServerTracer extends ServerStreamTracer
531+
implements ServerCallMethodListener {
530532
@Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
531533
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
532534
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
@@ -587,6 +589,11 @@ public io.grpc.Context filterContext(io.grpc.Context context) {
587589
return context;
588590
}
589591

592+
@Override
593+
public void serverCallMethodResolved(MethodDescriptor<?, ?> method) {
594+
isGeneratedMethod = method.isSampledToLocalTracing();
595+
}
596+
590597
@Override
591598
public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
592599
// Only record method name as an attribute if isSampledToLocalTracing is set to true,
@@ -644,9 +651,24 @@ public void streamClosed(Status status) {
644651
}
645652
stopwatch.stop();
646653
long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
647-
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
648-
.put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
649-
.put(STATUS_KEY, status.getCode().toString());
654+
recordClosedStream(
655+
status,
656+
elapsedTimeNanos,
657+
outboundWireSize,
658+
inboundWireSize,
659+
isGeneratedMethod);
660+
}
661+
662+
private void recordClosedStream(
663+
Status status,
664+
long elapsedTimeNanos,
665+
long closedOutboundWireSize,
666+
long closedInboundWireSize,
667+
boolean generatedMethod) {
668+
AttributesBuilder builder =
669+
io.opentelemetry.api.common.Attributes.builder()
670+
.put(METHOD_KEY, recordMethodName(fullMethodName, generatedMethod))
671+
.put(STATUS_KEY, status.getCode().toString());
650672
for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
651673
plugin.addLabels(builder);
652674
}
@@ -658,11 +680,11 @@ public void streamClosed(Status status) {
658680
}
659681
if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
660682
module.resource.serverTotalSentCompressedMessageSizeCounter()
661-
.record(outboundWireSize, attributes, otelContext);
683+
.record(closedOutboundWireSize, attributes, otelContext);
662684
}
663685
if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
664686
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
665-
.record(inboundWireSize, attributes, otelContext);
687+
.record(closedInboundWireSize, attributes, otelContext);
666688
}
667689
}
668690
}
@@ -744,4 +766,3 @@ public void onClose(Status status, Metadata trailers) {
744766
}
745767
}
746768
}
747-

0 commit comments

Comments
 (0)