diff --git a/instrumentation/grpc-1.40.0/src/main/java/io/grpc/internal/ServerImpl_Instrumentation.java b/instrumentation/grpc-1.40.0/src/main/java/io/grpc/internal/ServerImpl_Instrumentation.java index 3108e359f4..aa289953e3 100644 --- a/instrumentation/grpc-1.40.0/src/main/java/io/grpc/internal/ServerImpl_Instrumentation.java +++ b/instrumentation/grpc-1.40.0/src/main/java/io/grpc/internal/ServerImpl_Instrumentation.java @@ -42,10 +42,21 @@ private void streamCreatedInternal( StatsTraceContext statsTraceCtx) { MethodDescriptor methodDescriptor = methodDef.getMethodDescriptor(); String fullMethodName = methodDescriptor != null ? methodDescriptor.getFullMethodName() : null; - NewRelic.getAgent().getTransaction().setWebRequest(new GrpcRequest(fullMethodName, stream.getAuthority(), this.headers)); + // Check if this is a BIDI streaming method + boolean isBidiStreaming = methodDescriptor != null && + methodDescriptor.getType() == MethodDescriptor.MethodType.BIDI_STREAMING; + + // Always create transaction and set web request + NewRelic.getAgent().getTransaction().setWebRequest(new GrpcRequest(fullMethodName, stream.getAuthority(), this.headers)); stream.token = AgentBridge.getAgent().getTransaction().getToken(); + // Mark BIDI streaming for special handling (transaction ends on first response) + if (isBidiStreaming) { + stream.isBidiStreaming = true; + NewRelic.addCustomParameter("grpc.transaction_ends_on_response", true); + } + if (fullMethodName != null && !fullMethodName.isEmpty()) { NewRelic.addCustomParameter("request.method", fullMethodName); NewRelic.getAgent().getTracedMethod().setMetricName("gRPC", "ServerCallHandler", "startCall", fullMethodName); diff --git a/instrumentation/grpc-1.40.0/src/main/java/io/grpc/internal/ServerStream_Instrumentation.java b/instrumentation/grpc-1.40.0/src/main/java/io/grpc/internal/ServerStream_Instrumentation.java index f1aef60cb3..136db009eb 100644 --- a/instrumentation/grpc-1.40.0/src/main/java/io/grpc/internal/ServerStream_Instrumentation.java +++ b/instrumentation/grpc-1.40.0/src/main/java/io/grpc/internal/ServerStream_Instrumentation.java @@ -28,10 +28,45 @@ public abstract class ServerStream_Instrumentation { @NewField public Token token; + @NewField + public boolean isBidiStreaming; + + @NewField + public boolean responseSent; + + /** + * Instrumentation for writeMessage - called when server sends a response message. + * For BIDI streaming, we finalize and expire the token after the first message is sent, + * which ends the transaction at the point of response completion. + */ + @Trace(async = true) + public void writeMessage(Object message) { + // Call original method first to send the message + Weaver.callOriginal(); + + // For BIDI streaming, end transaction after first response is sent + if (isBidiStreaming && !responseSent && token != null) { + // Finalize the transaction with success status + GrpcUtil.finalizeTransaction(token, Status.OK, new Metadata()); + + // Expire the token to end the transaction + token.expire(); + + // Mark that response was sent and clear token + responseSent = true; + token = null; + + NewRelic.addCustomParameter("grpc.transaction_ended_on_response", true); + } + } + @Trace(async = true) public void close(Status status, Metadata metadata) { - GrpcUtil.finalizeTransaction(token, status, metadata); - GrpcUtil.setServerStreamResponseStatus(status); + // Only finalize if token exists (token will be null for BIDI streaming after response sent) + if (token != null) { + GrpcUtil.finalizeTransaction(token, status, metadata); + GrpcUtil.setServerStreamResponseStatus(status); + } Weaver.callOriginal(); @@ -44,8 +79,11 @@ public void close(Status status, Metadata metadata) { // server had an internal error @Trace(async = true) public void cancel(Status status) { - GrpcUtil.finalizeTransaction(token, status, new Metadata()); - GrpcUtil.setServerStreamResponseStatus(status); + // Only finalize if token exists (token will be null for BIDI streaming after response sent) + if (token != null) { + GrpcUtil.finalizeTransaction(token, status, new Metadata()); + GrpcUtil.setServerStreamResponseStatus(status); + } Weaver.callOriginal();