|
28 | 28 | import static org.junit.Assert.assertTrue; |
29 | 29 | import static org.mockito.ArgumentMatchers.any; |
30 | 30 | import static org.mockito.ArgumentMatchers.anyDouble; |
| 31 | +import static org.mockito.Mockito.mock; |
| 32 | +import static org.mockito.Mockito.timeout; |
31 | 33 | import static org.mockito.Mockito.verify; |
| 34 | +import static org.mockito.Mockito.when; |
32 | 35 |
|
| 36 | +import com.google.common.base.Stopwatch; |
| 37 | +import com.google.common.collect.ImmutableList; |
33 | 38 | import com.google.common.collect.ImmutableMap; |
34 | 39 | import io.grpc.Attributes; |
35 | 40 | import io.grpc.CallOptions; |
|
38 | 43 | import io.grpc.ClientInterceptor; |
39 | 44 | import io.grpc.ClientInterceptors; |
40 | 45 | import io.grpc.ClientStreamTracer; |
| 46 | +import io.grpc.ForwardingClientCall; |
41 | 47 | import io.grpc.KnownLength; |
42 | 48 | import io.grpc.Metadata; |
43 | 49 | import io.grpc.MethodDescriptor; |
|
62 | 68 | import io.grpc.testing.protobuf.SimpleServiceGrpc; |
63 | 69 | import io.opentelemetry.api.OpenTelemetry; |
64 | 70 | import io.opentelemetry.api.baggage.Baggage; |
| 71 | +import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; |
65 | 72 | import io.opentelemetry.api.common.AttributeKey; |
66 | 73 | import io.opentelemetry.api.metrics.DoubleHistogram; |
| 74 | +import io.opentelemetry.api.metrics.LongCounter; |
| 75 | +import io.opentelemetry.api.metrics.LongHistogram; |
67 | 76 | import io.opentelemetry.api.metrics.Meter; |
68 | 77 | import io.opentelemetry.context.Context; |
| 78 | +import io.opentelemetry.context.propagation.ContextPropagators; |
| 79 | +import io.opentelemetry.context.propagation.TextMapPropagator; |
69 | 80 | import io.opentelemetry.context.propagation.TextMapSetter; |
70 | 81 | import io.opentelemetry.sdk.common.InstrumentationScopeInfo; |
71 | 82 | import io.opentelemetry.sdk.metrics.data.MetricData; |
|
76 | 87 | import java.util.List; |
77 | 88 | import java.util.Map; |
78 | 89 | import java.util.Optional; |
| 90 | +import java.util.concurrent.ExecutorService; |
| 91 | +import java.util.concurrent.Executors; |
79 | 92 | import java.util.concurrent.TimeUnit; |
80 | 93 | import java.util.concurrent.atomic.AtomicReference; |
81 | 94 | import javax.annotation.Nullable; |
|
96 | 109 | */ |
97 | 110 | @RunWith(JUnit4.class) |
98 | 111 | public class OpenTelemetryMetricsModuleTest { |
| 112 | + // ... existing code ... |
99 | 113 |
|
100 | 114 | private static final CallOptions.Key<String> CUSTOM_OPTION = |
101 | 115 | CallOptions.Key.createWithDefault("option1", "default"); |
@@ -1910,4 +1924,170 @@ public void unaryRpc(SimpleRequest request, StreamObserver<SimpleResponse> respo |
1910 | 1924 | responseObserver.onCompleted(); |
1911 | 1925 | } |
1912 | 1926 | } |
| 1927 | + |
| 1928 | + @Test |
| 1929 | + public void serverMetricsShouldRecordContextWithBaggage() { |
| 1930 | + // Mocks |
| 1931 | + DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); |
| 1932 | + OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); |
| 1933 | + when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); |
| 1934 | + |
| 1935 | + // ContextPropagators with Baggage |
| 1936 | + ContextPropagators propagators = ContextPropagators.create( |
| 1937 | + TextMapPropagator.composite(W3CBaggagePropagator.getInstance())); |
| 1938 | + |
| 1939 | + // Module |
| 1940 | + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( |
| 1941 | + Stopwatch::createUnstarted, |
| 1942 | + resource, |
| 1943 | + ImmutableList.of(), |
| 1944 | + ImmutableList.of(), |
| 1945 | + propagators); |
| 1946 | + |
| 1947 | + // Baggage to inject |
| 1948 | + Baggage baggage = Baggage.builder().put("my-baggage-key", "my-baggage-value").build(); |
| 1949 | + Metadata headers = new Metadata(); |
| 1950 | + propagators.getTextMapPropagator().inject(Context.root().with(baggage), headers, |
| 1951 | + new MetadataSetter()); |
| 1952 | + |
| 1953 | + // Create Tracer |
| 1954 | + io.grpc.ServerStreamTracer.Factory factory = module.getServerTracerFactory(); |
| 1955 | + io.grpc.ServerStreamTracer tracer = factory.newServerStreamTracer("test/method", headers); |
| 1956 | + |
| 1957 | + // Close stream logic |
| 1958 | + tracer.streamClosed(Status.OK); |
| 1959 | + |
| 1960 | + // Verify record called with context (which should have baggage) |
| 1961 | + verify(serverCallDurationCounter).record( |
| 1962 | + anyDouble(), |
| 1963 | + any(), |
| 1964 | + org.mockito.ArgumentMatchers.argThat(ctx -> { |
| 1965 | + Baggage b = Baggage.fromContext(ctx); |
| 1966 | + return "my-baggage-value".equals(b.getEntryValue("my-baggage-key")); |
| 1967 | + })); |
| 1968 | + } |
| 1969 | + |
| 1970 | + @Test |
| 1971 | + public void serverMetrics_withExternalExecutor_propagatesBaggage() throws Exception { |
| 1972 | + // Setup Mocks & Resource |
| 1973 | + DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); |
| 1974 | + LongCounter serverCallCountCounter = mock(LongCounter.class); |
| 1975 | + LongHistogram serverTotalSentCompressedMessageSizeCounter = mock(LongHistogram.class); |
| 1976 | + LongHistogram serverTotalReceivedCompressedMessageSizeCounter = mock(LongHistogram.class); |
| 1977 | + OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); |
| 1978 | + when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); |
| 1979 | + when(resource.serverCallCountCounter()).thenReturn(serverCallCountCounter); |
| 1980 | + when(resource.serverTotalSentCompressedMessageSizeCounter()) |
| 1981 | + .thenReturn(serverTotalSentCompressedMessageSizeCounter); |
| 1982 | + when(resource.serverTotalReceivedCompressedMessageSizeCounter()) |
| 1983 | + .thenReturn(serverTotalReceivedCompressedMessageSizeCounter); |
| 1984 | + |
| 1985 | + // Setup Propagators |
| 1986 | + ContextPropagators propagators = ContextPropagators.create(W3CBaggagePropagator.getInstance()); |
| 1987 | + |
| 1988 | + // Initialize Module |
| 1989 | + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( |
| 1990 | + Stopwatch::createUnstarted, |
| 1991 | + resource, |
| 1992 | + ImmutableList.of(), |
| 1993 | + ImmutableList.of(), |
| 1994 | + propagators); |
| 1995 | + |
| 1996 | + // Setup Server with Wrapped Executor (Custom Executor) |
| 1997 | + ExecutorService customExecutor = Executors.newFixedThreadPool(2); |
| 1998 | + java.util.concurrent.Executor rawExecutor = customExecutor; |
| 1999 | + |
| 2000 | + String serverName = InProcessServerBuilder.generateName(); |
| 2001 | + io.grpc.Server server = InProcessServerBuilder.forName(serverName) |
| 2002 | + .executor(rawExecutor) |
| 2003 | + .addService(new SimpleServiceGrpc.SimpleServiceImplBase() { |
| 2004 | + @Override |
| 2005 | + public void unaryRpc( |
| 2006 | + SimpleRequest request, |
| 2007 | + StreamObserver<SimpleResponse> responseObserver) { |
| 2008 | + responseObserver.onNext(SimpleResponse.getDefaultInstance()); |
| 2009 | + responseObserver.onCompleted(); |
| 2010 | + } |
| 2011 | + }) |
| 2012 | + .addStreamTracerFactory(module.getServerTracerFactory()) |
| 2013 | + .build().start(); |
| 2014 | + |
| 2015 | + // Client Interceptor to inject baggage |
| 2016 | + ClientInterceptor baggageInterceptor = new ClientInterceptor() { |
| 2017 | + @Override |
| 2018 | + public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( |
| 2019 | + MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { |
| 2020 | + return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>( |
| 2021 | + next.newCall(method, callOptions)) { |
| 2022 | + @Override |
| 2023 | + public void start(Listener<RespT> responseListener, Metadata headers) { |
| 2024 | + propagators.getTextMapPropagator().inject(Context.current(), headers, |
| 2025 | + new MetadataSetter()); |
| 2026 | + super.start(responseListener, headers); |
| 2027 | + } |
| 2028 | + }; |
| 2029 | + } |
| 2030 | + }; |
| 2031 | + |
| 2032 | + // Setup Client and Inject Baggage |
| 2033 | + io.grpc.ManagedChannel channel = InProcessChannelBuilder.forName(serverName) |
| 2034 | + .intercept(baggageInterceptor) |
| 2035 | + .directExecutor() |
| 2036 | + .build(); |
| 2037 | + SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc |
| 2038 | + .newBlockingStub(channel); |
| 2039 | + |
| 2040 | + // Define multiple Baggage items |
| 2041 | + Baggage testBaggage = Baggage.builder() |
| 2042 | + .put("key1", "value1") |
| 2043 | + .put("key2", "value/with/special:chars") |
| 2044 | + .build(); |
| 2045 | + |
| 2046 | + // Make the call with Baggage in Context |
| 2047 | + try (io.opentelemetry.context.Scope scope = Context.current().with(testBaggage).makeCurrent()) { |
| 2048 | + stub.unaryRpc(SimpleRequest.getDefaultInstance()); |
| 2049 | + } |
| 2050 | + |
| 2051 | + // Shutdown and Wait |
| 2052 | + channel.shutdownNow(); |
| 2053 | + server.shutdown().awaitTermination(5, TimeUnit.SECONDS); |
| 2054 | + customExecutor.shutdownNow(); |
| 2055 | + |
| 2056 | + // Verification Logic for Baggage |
| 2057 | + org.mockito.ArgumentMatcher<Context> baggageMatcher = ctx -> { |
| 2058 | + Baggage b = Baggage.fromContext(ctx); |
| 2059 | + return "value1".equals(b.getEntryValue("key1")) |
| 2060 | + && "value/with/special:chars".equals(b.getEntryValue("key2")); |
| 2061 | + }; |
| 2062 | + |
| 2063 | + // Verify all metrics recorded with correct baggage |
| 2064 | + // Use timeout to avoid race conditions as metrics might be recorded |
| 2065 | + // asynchronously |
| 2066 | + verify(serverCallDurationCounter, timeout(5000)).record( |
| 2067 | + anyDouble(), |
| 2068 | + any(), // Attributes |
| 2069 | + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); |
| 2070 | + |
| 2071 | + verify(serverCallCountCounter, timeout(5000)).add( |
| 2072 | + org.mockito.ArgumentMatchers.eq(1L), |
| 2073 | + any(), // Attributes |
| 2074 | + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); |
| 2075 | + |
| 2076 | + verify(serverTotalSentCompressedMessageSizeCounter, timeout(5000)).record( |
| 2077 | + org.mockito.ArgumentMatchers.anyLong(), |
| 2078 | + any(), // Attributes |
| 2079 | + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); |
| 2080 | + |
| 2081 | + verify(serverTotalReceivedCompressedMessageSizeCounter, timeout(5000)).record( |
| 2082 | + org.mockito.ArgumentMatchers.anyLong(), |
| 2083 | + any(), // Attributes |
| 2084 | + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); |
| 2085 | + } |
| 2086 | + |
| 2087 | + private static class MetadataSetter implements TextMapSetter<Metadata> { |
| 2088 | + @Override |
| 2089 | + public void set(Metadata carrier, String key, String value) { |
| 2090 | + carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); |
| 2091 | + } |
| 2092 | + } |
1913 | 2093 | } |
0 commit comments