Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,21 @@ private void streamCreatedInternal(
StatsTraceContext statsTraceCtx) {
MethodDescriptor<ReqT, RespT> methodDescriptor = methodDef.getMethodDescriptor();
String fullMethodName = methodDescriptor != null ? methodDescriptor.getFullMethodName() : null;
NewRelic.getAgent().getTransaction().setWebRequest(new GrpcRequest(fullMethodName, stream.getAuthority(), this.headers));

// Check if this is a BIDI streaming method
boolean isBidiStreaming = methodDescriptor != null &&
methodDescriptor.getType() == MethodDescriptor.MethodType.BIDI_STREAMING;

// Always create transaction and set web request
NewRelic.getAgent().getTransaction().setWebRequest(new GrpcRequest(fullMethodName, stream.getAuthority(), this.headers));
stream.token = AgentBridge.getAgent().getTransaction().getToken();

// Mark BIDI streaming for special handling (transaction ends on first response)
if (isBidiStreaming) {
stream.isBidiStreaming = true;
NewRelic.addCustomParameter("grpc.transaction_ends_on_response", true);
}

if (fullMethodName != null && !fullMethodName.isEmpty()) {
NewRelic.addCustomParameter("request.method", fullMethodName);
NewRelic.getAgent().getTracedMethod().setMetricName("gRPC", "ServerCallHandler", "startCall", fullMethodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,45 @@ public abstract class ServerStream_Instrumentation {
@NewField
public Token token;

@NewField
public boolean isBidiStreaming;

@NewField
public boolean responseSent;

/**
* Instrumentation for writeMessage - called when server sends a response message.
* For BIDI streaming, we finalize and expire the token after the first message is sent,
* which ends the transaction at the point of response completion.
*/
@Trace(async = true)
public void writeMessage(Object message) {
// Call original method first to send the message
Weaver.callOriginal();

// For BIDI streaming, end transaction after first response is sent
if (isBidiStreaming && !responseSent && token != null) {
// Finalize the transaction with success status
GrpcUtil.finalizeTransaction(token, Status.OK, new Metadata());

// Expire the token to end the transaction
token.expire();

// Mark that response was sent and clear token
responseSent = true;
token = null;

NewRelic.addCustomParameter("grpc.transaction_ended_on_response", true);
}
}

@Trace(async = true)
public void close(Status status, Metadata metadata) {
GrpcUtil.finalizeTransaction(token, status, metadata);
GrpcUtil.setServerStreamResponseStatus(status);
// Only finalize if token exists (token will be null for BIDI streaming after response sent)
if (token != null) {
GrpcUtil.finalizeTransaction(token, status, metadata);
GrpcUtil.setServerStreamResponseStatus(status);
}

Weaver.callOriginal();

Expand All @@ -44,8 +79,11 @@ public void close(Status status, Metadata metadata) {
// server had an internal error
@Trace(async = true)
public void cancel(Status status) {
GrpcUtil.finalizeTransaction(token, status, new Metadata());
GrpcUtil.setServerStreamResponseStatus(status);
// Only finalize if token exists (token will be null for BIDI streaming after response sent)
if (token != null) {
GrpcUtil.finalizeTransaction(token, status, new Metadata());
GrpcUtil.setServerStreamResponseStatus(status);
}

Weaver.callOriginal();

Expand Down
Loading