Skip to content

Commit 2f77e2d

Browse files
committed
feat: Implement trace context extraction and injection interceptors
1 parent 39133b5 commit 2f77e2d

File tree

13 files changed

+143
-18
lines changed

13 files changed

+143
-18
lines changed

sdk-platform-java/gax-java/gax-grpc/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
</parent>
1616

1717
<dependencies>
18+
<dependency>
19+
<groupId>io.opentelemetry</groupId>
20+
<artifactId>opentelemetry-api</artifactId>
21+
</dependency>
22+
<dependency>
23+
<groupId>io.opentelemetry</groupId>
24+
<artifactId>opentelemetry-context</artifactId>
25+
</dependency>
1826
<dependency>
1927
<groupId>com.google.api</groupId>
2028
<artifactId>gax</artifactId>
@@ -100,6 +108,16 @@
100108
</dependency>
101109

102110
<!-- test dependencies -->
111+
<dependency>
112+
<groupId>io.opentelemetry</groupId>
113+
<artifactId>opentelemetry-sdk</artifactId>
114+
<scope>test</scope>
115+
</dependency>
116+
<dependency>
117+
<groupId>io.opentelemetry</groupId>
118+
<artifactId>opentelemetry-sdk-testing</artifactId>
119+
<scope>test</scope>
120+
</dependency>
103121
<dependency>
104122
<groupId>io.grpc</groupId>
105123
<artifactId>grpc-s2a</artifactId>

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcClientCalls.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,16 @@ public static <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
9090
channel = ((ChannelPool) channel).getChannel(grpcContext.getChannelAffinity());
9191
}
9292

93-
if (!grpcContext.getExtraHeaders().isEmpty()) {
94-
ClientInterceptor interceptor =
95-
MetadataUtils.newAttachHeadersInterceptor(grpcContext.getMetadata());
93+
java.util.Map<String, String> traceContext = new java.util.HashMap<>();
94+
grpcContext.getTracer().injectTraceContext(traceContext);
95+
96+
if (!grpcContext.getExtraHeaders().isEmpty() || !traceContext.isEmpty()) {
97+
Metadata metadata = grpcContext.getMetadata();
98+
for (java.util.Map.Entry<String, String> entry : traceContext.entrySet()) {
99+
metadata.put(
100+
Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER), entry.getValue());
101+
}
102+
ClientInterceptor interceptor = MetadataUtils.newAttachHeadersInterceptor(metadata);
96103
channel = ClientInterceptors.intercept(channel, interceptor);
97104
}
98105

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectBidiStreamingCallable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.api.gax.rpc.ClientStream;
3535
import com.google.api.gax.rpc.ClientStreamReadyObserver;
3636
import com.google.api.gax.rpc.ResponseObserver;
37+
import com.google.api.gax.tracing.ApiTracer.Scope;
3738
import com.google.common.base.Preconditions;
3839
import io.grpc.ClientCall;
3940
import io.grpc.MethodDescriptor;
@@ -93,7 +94,9 @@ public void run() {
9394
onReady.onReady(clientStream);
9495
}
9596
});
96-
controller.startBidi();
97+
try (Scope ignored = context.getTracer().inScope()) {
98+
controller.startBidi();
99+
}
97100

98101
return clientStream;
99102
}

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectCallable.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.api.core.ListenableFutureToApiFuture;
3535
import com.google.api.gax.rpc.ApiCallContext;
3636
import com.google.api.gax.rpc.UnaryCallable;
37+
import com.google.api.gax.tracing.ApiTracer.Scope;
3738
import com.google.common.base.Preconditions;
3839
import io.grpc.ClientCall;
3940
import io.grpc.MethodDescriptor;
@@ -61,10 +62,12 @@ public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext inputCon
6162

6263
ClientCall<RequestT, ResponseT> clientCall = GrpcClientCalls.newCall(descriptor, inputContext);
6364

64-
if (awaitTrailers) {
65-
return new ListenableFutureToApiFuture<>(ClientCalls.futureUnaryCall(clientCall, request));
66-
} else {
67-
return GrpcClientCalls.eagerFutureUnaryCall(clientCall, request);
65+
try (Scope ignored = inputContext.getTracer().inScope()) {
66+
if (awaitTrailers) {
67+
return new ListenableFutureToApiFuture<>(ClientCalls.futureUnaryCall(clientCall, request));
68+
} else {
69+
return GrpcClientCalls.eagerFutureUnaryCall(clientCall, request);
70+
}
6871
}
6972
}
7073

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectClientStreamingCallable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.api.gax.rpc.ApiCallContext;
3333
import com.google.api.gax.rpc.ApiStreamObserver;
3434
import com.google.api.gax.rpc.ClientStreamingCallable;
35+
import com.google.api.gax.tracing.ApiTracer.Scope;
3536
import com.google.common.base.Preconditions;
3637
import io.grpc.ClientCall;
3738
import io.grpc.MethodDescriptor;
@@ -57,8 +58,10 @@ public ApiStreamObserver<RequestT> clientStreamingCall(
5758
ApiStreamObserver<ResponseT> responseObserver, ApiCallContext context) {
5859
Preconditions.checkNotNull(responseObserver);
5960
ClientCall<RequestT, ResponseT> call = GrpcClientCalls.newCall(descriptor, context);
60-
return new StreamObserverDelegate<>(
61-
ClientCalls.asyncClientStreamingCall(
62-
call, new ApiStreamObserverDelegate<>(responseObserver)));
61+
try (Scope ignored = context.getTracer().inScope()) {
62+
return new StreamObserverDelegate<>(
63+
ClientCalls.asyncClientStreamingCall(
64+
call, new ApiStreamObserverDelegate<>(responseObserver)));
65+
}
6366
}
6467
}

sdk-platform-java/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectServerStreamingCallable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.api.gax.rpc.ResponseObserver;
3434
import com.google.api.gax.rpc.ServerStreamingCallable;
3535
import com.google.api.gax.rpc.StreamController;
36+
import com.google.api.gax.tracing.ApiTracer.Scope;
3637
import com.google.common.base.Preconditions;
3738
import io.grpc.ClientCall;
3839
import io.grpc.MethodDescriptor;
@@ -65,6 +66,8 @@ public void call(
6566
ClientCall<RequestT, ResponseT> call = GrpcClientCalls.newCall(descriptor, context);
6667
GrpcDirectStreamController<RequestT, ResponseT> controller =
6768
new GrpcDirectStreamController<>(call, responseObserver);
68-
controller.start(request);
69+
try (Scope ignored = context.getTracer().inScope()) {
70+
controller.start(request);
71+
}
6972
}
7073
}

sdk-platform-java/gax-java/gax-httpjson/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@
2020
</properties>
2121

2222
<dependencies>
23+
<dependency>
24+
<groupId>io.opentelemetry</groupId>
25+
<artifactId>opentelemetry-api</artifactId>
26+
</dependency>
27+
<dependency>
28+
<groupId>io.opentelemetry</groupId>
29+
<artifactId>opentelemetry-context</artifactId>
30+
</dependency>
2331
<dependency>
2432
<groupId>com.google.api</groupId>
2533
<artifactId>gax</artifactId>
@@ -86,6 +94,16 @@
8694
</dependency>
8795

8896
<!-- test dependencies -->
97+
<dependency>
98+
<groupId>io.opentelemetry</groupId>
99+
<artifactId>opentelemetry-sdk</artifactId>
100+
<scope>test</scope>
101+
</dependency>
102+
<dependency>
103+
<groupId>io.opentelemetry</groupId>
104+
<artifactId>opentelemetry-sdk-testing</artifactId>
105+
<scope>test</scope>
106+
</dependency>
89107
<dependency>
90108
<groupId>com.google.api</groupId>
91109
<artifactId>gax</artifactId>

sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCalls.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,25 @@ public static <RequestT, ResponseT> HttpJsonClientCall<RequestT, ResponseT> newC
8080
return httpJsonContext.getChannel().newCall(methodDescriptor, httpJsonContext.getCallOptions());
8181
}
8282

83+
static HttpJsonMetadata getMetadataWithTraceContext(HttpJsonCallContext context) {
84+
java.util.Map<String, String> traceHeaders = new java.util.HashMap<>();
85+
context.getTracer().injectTraceContext(traceHeaders);
86+
87+
java.util.Map<String, java.util.List<String>> finalHeaders =
88+
new java.util.HashMap<>(context.getExtraHeaders());
89+
for (java.util.Map.Entry<String, String> entry : traceHeaders.entrySet()) {
90+
finalHeaders.put(entry.getKey(), java.util.Collections.singletonList(entry.getValue()));
91+
}
92+
return HttpJsonMetadata.newBuilder().build().withHeaders(finalHeaders);
93+
}
94+
8395
static <RequestT, ResponseT> ApiFuture<ResponseT> futureUnaryCall(
8496
HttpJsonClientCall<RequestT, ResponseT> clientCall,
8597
RequestT request,
8698
HttpJsonCallContext context) {
8799
// Start the call
88100
HttpJsonFuture<ResponseT> future = new HttpJsonFuture<>(clientCall);
89-
clientCall.start(
90-
new FutureListener<>(future),
91-
HttpJsonMetadata.newBuilder().build().withHeaders(context.getExtraHeaders()));
101+
clientCall.start(new FutureListener<>(future), getMetadataWithTraceContext(context));
92102

93103
// Send the request
94104
try {

sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectCallable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.google.api.core.ApiFuture;
3333
import com.google.api.gax.rpc.ApiCallContext;
3434
import com.google.api.gax.rpc.UnaryCallable;
35+
import com.google.api.gax.tracing.ApiTracer.Scope;
3536
import com.google.common.base.Preconditions;
3637
import com.google.protobuf.TypeRegistry;
3738

@@ -65,7 +66,9 @@ public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext inputCon
6566

6667
HttpJsonClientCall<RequestT, ResponseT> clientCall =
6768
HttpJsonClientCalls.newCall(descriptor, context);
68-
return HttpJsonClientCalls.futureUnaryCall(clientCall, request, context);
69+
try (Scope ignored = context.getTracer().inScope()) {
70+
return HttpJsonClientCalls.futureUnaryCall(clientCall, request, context);
71+
}
6972
}
7073

7174
@Override

sdk-platform-java/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.api.gax.rpc.ResponseObserver;
3434
import com.google.api.gax.rpc.ServerStreamingCallable;
3535
import com.google.api.gax.rpc.StreamController;
36+
import com.google.api.gax.tracing.ApiTracer.Scope;
3637
import com.google.common.base.Preconditions;
3738

3839
/**
@@ -64,6 +65,8 @@ public void call(
6465
HttpJsonClientCall<RequestT, ResponseT> call = HttpJsonClientCalls.newCall(descriptor, context);
6566
HttpJsonDirectStreamController<RequestT, ResponseT> controller =
6667
new HttpJsonDirectStreamController<>(call, responseObserver);
67-
controller.start(request, context);
68+
try (Scope ignored = context.getTracer().inScope()) {
69+
controller.start(request, context);
70+
}
6871
}
6972
}

0 commit comments

Comments
 (0)