diff --git a/instrumentation/finagle-http-23.11/compile-stub/build.gradle.kts b/instrumentation/finagle-http-23.11/compile-stub/build.gradle.kts new file mode 100644 index 000000000000..0a3932d24293 --- /dev/null +++ b/instrumentation/finagle-http-23.11/compile-stub/build.gradle.kts @@ -0,0 +1,3 @@ +plugins { + id("otel.java-conventions") +} diff --git a/instrumentation/finagle-http-23.11/compile-stub/src/main/java/com/twitter/util/Promise.java b/instrumentation/finagle-http-23.11/compile-stub/src/main/java/com/twitter/util/Promise.java new file mode 100644 index 000000000000..fc6ff49134c4 --- /dev/null +++ b/instrumentation/finagle-http-23.11/compile-stub/src/main/java/com/twitter/util/Promise.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.twitter.util; + +// public accessible stubs of mirrored types in com.twitter.util to ensure compilation; +// these classes are consumed as a compileOnly module and replaced at runtime with their +// stubbed counterparts +public class Promise { + private Promise() {} + + @SuppressWarnings("ClassNamedLikeTypeParameter") + public abstract static class K {} + + public abstract static class Interruptible {} +} diff --git a/instrumentation/finagle-http-23.11/javaagent/build.gradle.kts b/instrumentation/finagle-http-23.11/javaagent/build.gradle.kts index 0c2b73906233..156c95f0cc7e 100644 --- a/instrumentation/finagle-http-23.11/javaagent/build.gradle.kts +++ b/instrumentation/finagle-http-23.11/javaagent/build.gradle.kts @@ -32,11 +32,17 @@ dependencies { library("${scalified("com.twitter:finagle-http")}:$finagleVersion") - // should wire netty contexts testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent")) + // Exclude the Promise stub and its nested classes; + // this allows us to compile against these types in the instrumentation + // despite them being private in their original inner class; + // this is required for VirtualField to have a concrete type to find/get/set on. + compileOnly(project(":instrumentation:finagle-http-23.11:compile-stub")) + implementation(project(":instrumentation:netty:netty-4.1:javaagent")) implementation(project(":instrumentation:netty:netty-4.1:library")) + implementation(project(":instrumentation:netty:netty-common-4.0:javaagent")) implementation(project(":instrumentation:netty:netty-common-4.0:library")) } @@ -48,6 +54,17 @@ tasks { } test { + jvmArgs("-Dotel.instrumentation.http.client.emit-experimental-telemetry=true") + jvmArgs("-Dotel.instrumentation.http.server.emit-experimental-telemetry=true") + jvmArgs("-Dio.opentelemetry.context.enableStrictContext=true") + + // force the netty event loop into constrained territory + systemProperty("io.netty.eventLoopThreads", "2") + // ensure concurrent tests are competing for netty workers + systemProperty("com.twitter.finagle.netty4.numWorkers", "2") + // ensure concurrent tests are competing for offload pool workers + systemProperty("com.twitter.finagle.offload.numWorkers", "2") + systemProperty( "metadataConfig", "otel.instrumentation.http.client.emit-experimental-telemetry=true," + @@ -59,6 +76,15 @@ tasks { testClassesDirs = sourceSets.test.get().output.classesDirs classpath = sourceSets.test.get().runtimeClasspath jvmArgs("-Dotel.semconv-stability.opt-in=service.peer") + jvmArgs("-Dio.opentelemetry.context.enableStrictContext=true") + + // force the netty event loop into constrained territory + systemProperty("io.netty.eventLoopThreads", "2") + // ensure concurrent tests are competing for netty workers + systemProperty("com.twitter.finagle.netty4.numWorkers", "2") + // ensure concurrent tests are competing for offload pool workers + systemProperty("com.twitter.finagle.offload.numWorkers", "2") + systemProperty( "metadataConfig", "otel.instrumentation.http.client.emit-experimental-telemetry=true," + diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/com/twitter/finagle/Netty4HttpPackageHelpers.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/com/twitter/finagle/Netty4HttpPackageHelpers.java new file mode 100644 index 000000000000..7e3cc16b826c --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/com/twitter/finagle/Netty4HttpPackageHelpers.java @@ -0,0 +1,16 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.twitter.finagle; + +import com.twitter.finagle.netty4.http.package$; + +public class Netty4HttpPackageHelpers { + private Netty4HttpPackageHelpers() {} + + public static String getHttpCodecName() { + return package$.MODULE$.HttpCodecName(); + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/BijectionsNettyInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/BijectionsNettyInstrumentation.java new file mode 100644 index 000000000000..38f7126c7ae7 --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/BijectionsNettyInstrumentation.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import com.twitter.finagle.http.Request; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpRequest; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +/** Bridges the Context from Netty request types to finagle request types. */ +class BijectionsNettyInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("com.twitter.finagle.netty4.http.Bijections$netty$"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("fullRequestToFinagle")), getClass().getName() + "$FullRequestAdvice"); + transformer.applyAdviceToMethod( + isMethod().and(named("chunkedRequestToFinagle")), + getClass().getName() + "$ChunkedRequestAdvice"); + } + + @SuppressWarnings("unused") + public static class FullRequestAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + public static void onApplyExit( + @Advice.Return Request ret, @Advice.Argument(0) FullHttpRequest in) { + Helpers.chainContextToFinagle(in, ret); + } + } + + @SuppressWarnings("unused") + public static class ChunkedRequestAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + public static void onApplyExit(@Advice.Return Request ret, @Advice.Argument(0) HttpRequest in) { + Helpers.chainContextToFinagle(in, ret); + } + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ChannelTransportInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ChannelTransportInstrumentation.java index 0dc2272dee6e..b745216d72a4 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ChannelTransportInstrumentation.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ChannelTransportInstrumentation.java @@ -5,18 +5,17 @@ package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.named; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; +import io.netty.channel.Channel; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import scala.Option; +/** Amends the tail of the Netty pipeline to bridge the netty request to its finagle request. */ class ChannelTransportInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { @@ -25,27 +24,15 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod(named("write"), getClass().getName() + "$WriteAdvice"); + transformer.applyAdviceToMethod(isConstructor(), getClass().getName() + "$ConstructorAdvice"); } @SuppressWarnings("unused") - public static class WriteAdvice { + public static class ConstructorAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) - @Nullable - public static Scope methodEnter() { - Option ref = Helpers.contextLocal().apply(); - if (ref.isDefined()) { - return ref.get().makeCurrent(); - } - return null; - } - - @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false) - public static void methodExit(@Advice.Enter @Nullable Scope scope) { - if (scope != null) { - scope.close(); - } + @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + public static void methodExit(@Advice.Argument(0) Channel ch) { + Helpers.mutateHandlerPipeline(ch); } } } diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FinagleHttpInstrumentationModule.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FinagleHttpInstrumentationModule.java index 06721781875b..81658c6ca854 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FinagleHttpInstrumentationModule.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FinagleHttpInstrumentationModule.java @@ -24,6 +24,7 @@ public FinagleHttpInstrumentationModule() { @Override public List typeInstrumentations() { return asList( + new BijectionsNettyInstrumentation(), new GenStreamingServerDispatcherInstrumentation(), new ChannelTransportInstrumentation(), new H2StreamChannelInitInstrumentation()); @@ -40,12 +41,14 @@ public List injectedClassNames() { // these are injected so that they can access package-private members return asList( "com.twitter.finagle.ChannelTransportHelpers", + "com.twitter.finagle.Netty4HttpPackageHelpers", "io.netty.channel.OpenTelemetryChannelInitializerDelegate"); } @Override public boolean isHelperClass(String className) { return className.equals("com.twitter.finagle.ChannelTransportHelpers") + || className.equals("com.twitter.finagle.Netty4HttpPackageHelpers") || className.equals("io.netty.channel.OpenTelemetryChannelInitializerDelegate"); } } diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Function1Wrapper.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Function1Wrapper.java deleted file mode 100644 index 61aad8de7832..000000000000 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Function1Wrapper.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; - -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import scala.Function1; - -public class Function1Wrapper { - - public static Function1 wrap(Function1 function1) { - Context context = Context.current(); - return value -> { - try (Scope ignored = context.makeCurrent()) { - return function1.apply(value); - } - }; - } - - private Function1Wrapper() {} -} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FutureInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FutureInstrumentation.java new file mode 100644 index 000000000000..68afcf42e23a --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FutureInstrumentation.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import com.twitter.util.Future; +import com.twitter.util.Try; +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import scala.Function1; +import scala.runtime.BoxedUnit; + +/** Instruments additional Future types that aren't captured by Promise.K. */ +class FutureInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("com.twitter.util.ConstFuture"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod().and(named("respond")), getClass().getName() + "$RespondAdvice"); + + // transformTry is documented as not being run in the scheduler, so it's not handled + transformer.applyAdviceToMethod( + isMethod().and(named("transform")), getClass().getName() + "$TransformAdvice"); + } + + @SuppressWarnings("unused") + public static class RespondAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + @Advice.AssignReturned.ToArguments(@ToArgument(0)) + public static Function1, BoxedUnit> onEnter( + @Advice.Argument(0) Function1, BoxedUnit> f) { + return TwitterUtilCoreHelpers.wrap(Context.current(), f); + } + } + + @SuppressWarnings("unused") + public static class TransformAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + @Advice.AssignReturned.ToArguments(@ToArgument(0)) + public static Function1, Future> onEnter( + @Advice.Argument(0) Function1, Future> f) { + return TwitterUtilCoreHelpers.wrap(Context.current(), f); + } + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/PromiseMonitoredInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FuturePoolInstrumentation.java similarity index 50% rename from instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/PromiseMonitoredInstrumentation.java rename to instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FuturePoolInstrumentation.java index ad354cf2d2e7..9116cbcfd04d 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/PromiseMonitoredInstrumentation.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FuturePoolInstrumentation.java @@ -5,43 +5,41 @@ package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import scala.Function1; +import scala.Function0; -class PromiseMonitoredInstrumentation implements TypeInstrumentation { +/** + * Instruments {@link com.twitter.util.ExecutorServiceFuturePool#apply} to wrap the submitted {@link + * Function0} so it executes under the caller's otel {@link Context}. + */ +class FuturePoolInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - return named("com.twitter.util.Promise$Monitored"); + return named("com.twitter.util.ExecutorServiceFuturePool"); } @Override public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( - isConstructor().and(takesArgument(1, named("scala.Function1"))), - getClass().getName() + "$WrapFunctionAdvice"); + isMethod().and(named("apply")), getClass().getName() + "$ApplyAdvice"); } @SuppressWarnings("unused") - public static class WrapFunctionAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) - @Advice.AssignReturned.ToArguments(@ToArgument(1)) - public static Function1 wrap(@Advice.Argument(1) Function1 function1) { - if (function1 == null) { - return null; - } - - return Function1Wrapper.wrap(function1); + public static class ApplyAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + @Advice.AssignReturned.ToArguments(@ToArgument(0)) + public static Function0 onApplyEnter(@Advice.Argument(0) Function0 f) { + return TwitterUtilCoreHelpers.wrap(Context.current(), f); } } } diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/GenStreamingServerDispatcherInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/GenStreamingServerDispatcherInstrumentation.java index b69dff2cb80d..c254ee8bddd7 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/GenStreamingServerDispatcherInstrumentation.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/GenStreamingServerDispatcherInstrumentation.java @@ -9,13 +9,22 @@ import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasSuperType; import static net.bytebuddy.matcher.ElementMatchers.named; +import com.twitter.finagle.http.Request; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +/** + * Part 3/3 of bridging the otel Context from netty to finagle. Instruments the dispatch call to + * extract the Context from the finagle Request context and assert it as current for the duration of + * the dispatch. This allows the other instrumentations to take over and carry the Context to its + * next span/s. + */ class GenStreamingServerDispatcherInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { @@ -29,28 +38,30 @@ public ElementMatcher classLoaderOptimization() { @Override public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod(named("loop"), getClass().getName() + "$LoopAdvice"); + transformer.applyAdviceToMethod(named("dispatch"), getClass().getName() + "$DispatchAdvice"); } @SuppressWarnings("unused") - public static class LoopAdvice { + public static class DispatchAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) - public static void methodEnter() { - // this works bc at this point in the server evaluation, the netty - // instrumentation has already gone to work and assigned the context to the - // local thread; - // - // this works specifically in finagle's netty stack bc at this point the loop() - // method is running on a netty thread with the necessary access to the - // java-native ThreadLocal where the Context is stored - Helpers.contextLocal().update(Context.current()); + @Nullable + public static Scope methodEnter(@Advice.Argument(0) Object req) { + if (req instanceof Request) { + // practically this will always be a Request, from HttpServerDispatcher + Request request = (Request) req; + // if this is null, there's a bug in the instrumentation + Context context = request.ctx().apply(Helpers.OTEL_CONTEXT_KEY); + return context != null ? context.makeCurrent() : null; + } + return null; } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false) - public static void methodExit() { - // always clear this - Helpers.contextLocal().clear(); + public static void methodExit(@Advice.Enter @Nullable Scope scope) { + if (scope != null) { + scope.close(); + } } } } diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/H2StreamChannelInitInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/H2StreamChannelInitInstrumentation.java index 784315f986b9..6b525b18d138 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/H2StreamChannelInitInstrumentation.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/H2StreamChannelInitInstrumentation.java @@ -16,6 +16,11 @@ import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +/** + * Bridges the instrumented netty instrumentation to finagle's http/2 netty integrations. Without + * this the link is broken as the netty ServerContexts don't pass through to the last handler in the + * pipeline where it's needed. + */ class H2StreamChannelInitInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Helpers.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Helpers.java index a3d57584f94b..8c1ee28537b7 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Helpers.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Helpers.java @@ -8,12 +8,21 @@ import static io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientRequestTracingHandler.HTTP_CLIENT_REQUEST; import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyClientSingletons.clientHandlerFactory; -import com.twitter.util.Local; +import com.twitter.finagle.ChannelTransportHelpers; +import com.twitter.finagle.Netty4HttpPackageHelpers; +import com.twitter.finagle.http.Request; +import com.twitter.finagle.http.Request$; +import com.twitter.finagle.http.collection.RecordSchema; +import com.twitter.finagle.http2.transport.common.Http2StreamMessageHandler; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.OpenTelemetryChannelInitializerDelegate; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; @@ -21,18 +30,24 @@ import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts; import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler; import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerTracingHandler; +import io.opentelemetry.javaagent.instrumentation.netty.common.v4_0.VirtualFieldHelper; import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyServerSingletons; public class Helpers { - private static final VirtualField CHANNEL_HANDLER = - VirtualField.find(ChannelHandler.class, ChannelHandler.class); - private static final Local contextLocal = new Local<>(); + private static final VirtualField FULL_HTTP_REQUEST_CONTEXT = + VirtualField.find(FullHttpRequest.class, Context.class); + private static final VirtualField HTTP_REQUEST_CONTEXT = + VirtualField.find(HttpRequest.class, Context.class); - public static Local contextLocal() { - return contextLocal; - } + public static final RecordSchema.Field OTEL_CONTEXT_KEY = + Request$.MODULE$.Schema().newField(); + + private static final String OTEL_NETTY_HANDLER = "otelFinagleNettyHandler"; + + private Helpers() {} + /** Bridges the netty instrumentation to the finagle-netty integration. */ public static ChannelInitializer wrapServer(ChannelInitializer inner) { return new OpenTelemetryChannelInitializerDelegate(inner) { @@ -62,13 +77,14 @@ protected void initChannel(C channel) throws Exception { .pipeline() .addAfter(codecCtx.name(), ourHandler.getClass().getName(), ourHandler); // attach this in this way to match up with how netty instrumentation expects things - CHANNEL_HANDLER.set(codecCtx.handler(), ourHandler); + VirtualFieldHelper.CHANNEL_HANDLER.set(codecCtx.handler(), ourHandler); } } } }; } + /** Bridges the netty instrumentation to the finagle-netty integration (for h2). */ public static ChannelInitializer wrapClient(ChannelInitializer inner) { return new OpenTelemetryChannelInitializerDelegate(inner) { @@ -99,12 +115,67 @@ protected void initChannel(C channel) throws Exception { .pipeline() .addAfter(codecCtx.name(), ourHandler.getClass().getName(), ourHandler); // attach this in this way to match up with how netty instrumentation expects things - CHANNEL_HANDLER.set(codecCtx.handler(), ourHandler); + VirtualFieldHelper.CHANNEL_HANDLER.set(codecCtx.handler(), ourHandler); } } } }; } - private Helpers() {} + /** Part 1/3 of bridging the otel Context from netty to finagle (for h2). */ + public static void mutateHandlerPipeline(Channel ch) { + ChannelHandler h1Handler = ch.pipeline().get(Netty4HttpPackageHelpers.getHttpCodecName()); + Http2StreamMessageHandler h2Handler = ch.pipeline().get(Http2StreamMessageHandler.class); + + // h1 server handler || h2 server handler; + // private class on a semi-private type -- not bothering to extract that any other way + if (h1Handler instanceof HttpServerCodec + || (h2Handler != null + && h2Handler + .getClass() + .getName() + .equals( + "com.twitter.finagle.http2.transport.common.Http2StreamMessageHandler$ServerHttp2StreamMessageHandler"))) { + // ensure we capture the server context and assign it to the outgoing request before offering + // to the AsyncQueue; + // not applicable to clients + ch.pipeline() + .addBefore( + ChannelTransportHelpers.getHandlerName(), + OTEL_NETTY_HANDLER, + new ChannelInboundHandlerAdapter() { + /* + Assign the context to the request. + + Part 1/3 for chaining the context from netty to finagle. + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + // type switch courtesy of + // com.twitter.finagle.netty4.http.Netty4ServerStreamTransport.read() + if (msg instanceof FullHttpRequest) { + FULL_HTTP_REQUEST_CONTEXT.set((FullHttpRequest) msg, Context.current()); + } else if (msg instanceof HttpRequest) { + HTTP_REQUEST_CONTEXT.set((HttpRequest) msg, Context.current()); + } + + super.channelRead(ctx, msg); + } + }); + } + } + + /** Part 2/3 of bridging the otel Context from netty to finagle. */ + public static void chainContextToFinagle(Object msg, Request request) { + Context context = null; + // type switch courtesy of com.twitter.finagle.netty4.http.Netty4ServerStreamTransport.read() + if (msg instanceof FullHttpRequest) { + context = FULL_HTTP_REQUEST_CONTEXT.get((FullHttpRequest) msg); + } else if (msg instanceof HttpRequest) { + context = HTTP_REQUEST_CONTEXT.get((HttpRequest) msg); + } + + // hook the Context from netty's request up to finagle's request + request.ctx().updateAndLock(OTEL_CONTEXT_KEY, context); + } } diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/LocalSchedulerActivationInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/LocalSchedulerActivationInstrumentation.java index 0971708f7ab9..f7c8632a8c76 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/LocalSchedulerActivationInstrumentation.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/LocalSchedulerActivationInstrumentation.java @@ -18,6 +18,10 @@ import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +/** + * Instruments the local scheduler submissions. Other Scheduler types are handled indirectly by Java + * Executor instrumentation. + */ class LocalSchedulerActivationInstrumentation implements TypeInstrumentation { @Override @@ -42,6 +46,7 @@ public static Runnable wrap(@Advice.Argument(0) Runnable task) { return null; } + // always set it: you never know what might be polluting the thread local context at the time Context context = Java8BytecodeBridge.currentContext(); return ContextPropagatingRunnable.propagateContext(task, context); } diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/PromiseInterruptibleInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/PromiseInterruptibleInstrumentation.java new file mode 100644 index 000000000000..2fdf8abe5331 --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/PromiseInterruptibleInstrumentation.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.asm.Advice.AssignReturned.ToArguments.ToArgument; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; + +/** + * Inspired by Kamon's approach, instruments the interruptible such that it has access to the + * Context active on the Promise. + */ +class PromiseInterruptibleInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("com.twitter.util.Promise$Interruptible"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod(isConstructor(), getClass().getName() + "$ConstructorAdvice"); + } + + @SuppressWarnings("unused") + public static class ConstructorAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + @Advice.AssignReturned.ToArguments(@ToArgument(1)) + public static PartialFunction onExit( + @Advice.Argument(1) PartialFunction handler) { + if (handler instanceof TwitterUtilCoreHelpers.InterruptibleWithContext) { + return handler; + } + return new TwitterUtilCoreHelpers.InterruptibleWithContext(Context.current(), handler); + } + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/PromiseKInstrumentation.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/PromiseKInstrumentation.java new file mode 100644 index 000000000000..eba5e9befe4e --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/PromiseKInstrumentation.java @@ -0,0 +1,78 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.twitter.util.Promise; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +/** + * Instruments the Promise state machine so that all chains in the Futures/Fibers are + * otel-Context-coherent. + */ +class PromiseKInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + // includes Transformer's two lone derivatives + return named("com.twitter.util.Promise$Transformer") + .or(named("com.twitter.util.Promise$Monitored")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor().and(takesArgument(0, named("com.twitter.util.Local$Context"))), + getClass().getName() + "$TrapContextAdvice"); + transformer.applyAdviceToMethod( + isMethod().and(named("apply").and(takesArgument(0, named("com.twitter.util.Try")))), + getClass().getName() + "$ApplyAdvice"); + } + + @SuppressWarnings("unused") + public static class TrapContextAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + public static void onExit( + @Advice.This + @SuppressWarnings("rawtypes") // type is from compile-stub and masks private type + Promise.K thiz) { + TwitterUtilCoreHelpers.PROMISE_K_CONTEXT_FIELD.set(thiz, Context.current()); + } + } + + @SuppressWarnings("unused") + public static class ApplyAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + @Nullable + public static Scope onApplyEnter( + @Advice.This + @SuppressWarnings("rawtypes") // type is from compile-stub and masks private type + Promise.K thiz) { + // if this is null, there's a bug in the instrumentation + Context savedContext = TwitterUtilCoreHelpers.PROMISE_K_CONTEXT_FIELD.get(thiz); + return savedContext != null ? savedContext.makeCurrent() : null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) + public static void onApplyExit(@Advice.Enter @Nullable Scope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreHelpers.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreHelpers.java new file mode 100644 index 000000000000..2642b3f1843b --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreHelpers.java @@ -0,0 +1,69 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import com.twitter.util.Promise; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.util.VirtualField; +import scala.Function0; +import scala.Function1; +import scala.PartialFunction; +import scala.runtime.AbstractPartialFunction; +import scala.runtime.BoxedUnit; + +public class TwitterUtilCoreHelpers { + @SuppressWarnings("rawtypes") // type is from compile-stub and masks private type + public static final VirtualField PROMISE_K_CONTEXT_FIELD = + VirtualField.find(Promise.K.class, Context.class); + + private TwitterUtilCoreHelpers() {} + + public static class InterruptibleWithContext + extends AbstractPartialFunction { + private final Context context; + private final PartialFunction delegate; + + public InterruptibleWithContext( + Context context, PartialFunction delegate) { + this.context = context; + this.delegate = delegate; + } + + @Override + public boolean isDefinedAt(Throwable x) { + try (Scope ignored = context.makeCurrent()) { + // Return true only for inputs this function handles + return delegate.isDefinedAt(x); + } + } + + @Override + public BoxedUnit apply(Throwable x) { + try (Scope ignored = context.makeCurrent()) { + return delegate.apply(x); + } + } + } + + public static Function1 wrap(Context context, Function1 fn) { + return t -> { + // always set it: you never know what might be polluting the thread local context at the time + try (Scope ignored = context.makeCurrent()) { + return fn.apply(t); + } + }; + } + + public static Function0 wrap(Context context, Function0 fn) { + return () -> { + // always set it: you never know what might be polluting the thread local context at the time + try (Scope ignored = context.makeCurrent()) { + return fn.apply(); + } + }; + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreInstrumentationModule.java b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreInstrumentationModule.java index 14d77bad8925..0f15c00bdef3 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreInstrumentationModule.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreInstrumentationModule.java @@ -21,14 +21,13 @@ public TwitterUtilCoreInstrumentationModule() { super("finagle-http", "finagle-http-23.11", "twitter-util-core"); } - @Override - public String getModuleGroup() { - return "netty"; - } - @Override public List typeInstrumentations() { return asList( - new LocalSchedulerActivationInstrumentation(), new PromiseMonitoredInstrumentation()); + new FutureInstrumentation(), + new FuturePoolInstrumentation(), + new PromiseKInstrumentation(), + new PromiseInterruptibleInstrumentation(), + new LocalSchedulerActivationInstrumentation()); } } diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ClientTest.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/AbstractClientTest.java similarity index 71% rename from instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ClientTest.java rename to instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/AbstractClientTest.java index 9e7fb71ff203..b9c854074cbb 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ClientTest.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/AbstractClientTest.java @@ -5,7 +5,6 @@ package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; -import static io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11.Utils.createClient; import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; import static java.util.Collections.emptySet; @@ -20,13 +19,10 @@ import com.twitter.util.Await; import com.twitter.util.Duration; import com.twitter.util.Future; -import com.twitter.util.FuturePool; -import com.twitter.util.Time; import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; import io.netty.handler.timeout.ReadTimeoutException; import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.test.utils.PortUtils; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest; import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension; @@ -40,9 +36,6 @@ import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import org.assertj.core.api.AbstractThrowableAssert; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.extension.RegisterExtension; /** @@ -56,50 +49,23 @@ */ // todo implement http/2-specific tests; // otel test framework doesn't support an http/2 server out of the box -class ClientTest extends AbstractHttpClientTest { +abstract class AbstractClientTest extends AbstractHttpClientTest { @RegisterExtension static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent(); - private final Map> clients = new ConcurrentHashMap<>(); - - // finagle Services are closeable, but are bound to a host + port; - // as these are only known during the invocation of the test, each test must create and then - // tear down their respective Services. - // - // however, the underlying netty bits are reused between Services by default, so "close" - // works out to a more "virtual" operation than with other client libraries. - @AfterEach - void tearDown() throws Exception { - for (Service client : clients.values()) { - Await.ready(client.close(Time.fromSeconds(10))); - } - clients.clear(); - } + /** Concrete subclasses provide their per-class client extension. */ + protected abstract FinagleClientExtension clientExtension(); private Service getClient(URI uri) { - return getClient(uri, uri.getScheme().equals("https") ? ClientType.TLS : ClientType.DEFAULT); + return clientExtension().getService(uri); } private Service getClient(URI uri, ClientType clientType) { - return clients.computeIfAbsent( - clientType, - (type) -> createClient(type).newService(uri.getHost() + ":" + Utils.safePort(uri))); + return clientExtension().getService(uri, clientType); } private Future doSendRequest(Request request, URI uri) { - // push this onto a FuturePool for 2 reasons: - // 1) forces the request handling onto a different thread, ensuring test accuracy - // 2) using the default thread can mess with high concurrency scenarios - Context context = Context.current(); - return FuturePool.unboundedPool() - .apply( - () -> { - try (Scope ignored = context.makeCurrent()) { - return Await.result(getClient(uri).apply(request)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + return getClient(uri).apply(request); } @Override @@ -119,18 +85,26 @@ protected void configure(HttpClientTestOptions.Builder optionsBuilder) { }; }); - optionsBuilder.setHttpAttributes(ClientTest::getHttpAttributes); - optionsBuilder.setExpectedClientSpanNameMapper(ClientTest::getExpectedClientSpanName); + optionsBuilder.setHttpAttributes(AbstractClientTest::getHttpAttributes); + optionsBuilder.setExpectedClientSpanNameMapper(AbstractClientTest::getExpectedClientSpanName); optionsBuilder.disableTestRedirects(); optionsBuilder.spanEndsAfterBody(); optionsBuilder.setClientSpanErrorMapper( (uri, error) -> { - // all errors should be wrapped in RuntimeExceptions due to how we run things in - // doSendRequest() - AbstractThrowableAssert clientWrapAssert = - assertThat(error).isInstanceOf(RuntimeException.class); - if ("http://localhost:61/".equals(uri.toString()) - || "https://192.0.2.1/".equals(uri.toString())) { + if (("http://localhost:" + PortUtils.UNUSABLE_PORT + "/").equals(uri.toString())) { + assertThat(error) + .isInstanceOf(Failure.class) + .cause() + .isInstanceOf(ConnectionFailedException.class) + .cause() + // this is a private class + // .isInstanceOf(io.netty.channel.AbstractChannel.AnnotatedConnectException.class) + .cause() + // On Linux: ConnectException, On Windows: ClosedChannelException + .isInstanceOfAny(ConnectException.class, ClosedChannelException.class); + error = error.getCause().getCause(); + } else if ("https://192.0.2.1/".equals(uri.toString()) + || "http://192.0.2.1/".equals(uri.toString())) { // finagle handles all these in com.twitter.finagle.netty4.ConnectionBuilder.build(); // all errors emitted by the netty Bootstrap.connect() call are mapped to // twitter/finagle exceptions and handled accordingly; @@ -138,18 +112,17 @@ protected void configure(HttpClientTestOptions.Builder optionsBuilder) { // ConnectionFailedException // and then with a twitter Failure.rejected() call, resulting in the multiple nestings // of the root exception - clientWrapAssert - .cause() + assertThat(error) .isInstanceOf(Failure.class) .cause() .isInstanceOf(ConnectionFailedException.class) .cause() // On Linux: ConnectException, On Windows: ClosedChannelException .isInstanceOfAny(ConnectException.class, ClosedChannelException.class); - error = error.getCause().getCause().getCause(); + error = error.getCause().getCause(); } else if (uri.getPath().endsWith("/read-timeout")) { // not a connect() exception like the above, so is not wrapped as above; - clientWrapAssert.cause().isInstanceOf(ReadTimedOutException.class); + assertThat(error).isInstanceOf(ReadTimedOutException.class); // however, this specific case results in a mapping from netty's ReadTimeoutException // to finagle's ReadTimedOutException in the finagle client code, losing all trace of // the original exception; so we must construct it manually here @@ -176,8 +149,9 @@ public void sendRequestWithCallback( String method, URI uri, Map headers, - HttpClientResult httpClientResult) { - doSendRequest(request, uri) + HttpClientResult httpClientResult) + throws Exception { + Await.ready(doSendRequest(request, uri), Duration.fromSeconds(30)) .onSuccess( r -> { httpClientResult.complete(r.statusCode()); diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/AbstractServerTest.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/AbstractServerTest.java index 4c5e6e60f974..843e1bc2dc51 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/AbstractServerTest.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/AbstractServerTest.java @@ -15,6 +15,7 @@ import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT; import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS; +import com.twitter.finagle.Http; import com.twitter.finagle.ListeningServer; import com.twitter.finagle.Service; import com.twitter.finagle.http.Request; @@ -40,6 +41,16 @@ abstract class AbstractServerTest extends AbstractHttpServerTest client.withNoHttp2().withExecutionOffloaded(FuturePool.immediatePool())); + + @Override + protected FinagleClientExtension clientExtension() { + return CLIENT; + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ClientH1OffloadedTest.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ClientH1OffloadedTest.java new file mode 100644 index 000000000000..2f7d09a335a7 --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ClientH1OffloadedTest.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import com.twitter.util.FuturePool; +import org.junit.jupiter.api.extension.RegisterExtension; + +class ClientH1OffloadedTest extends AbstractClientTest { + + @RegisterExtension + static final FinagleClientExtension CLIENT = + new FinagleClientExtension( + // ensures all work is offloaded to a thread pool -- this is where problems can happen + client -> client.withNoHttp2().withExecutionOffloaded(FuturePool.unboundedPool())); + + @Override + protected FinagleClientExtension clientExtension() { + return CLIENT; + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ClientH1Test.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ClientH1Test.java new file mode 100644 index 000000000000..9a72edf77fcc --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ClientH1Test.java @@ -0,0 +1,23 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import com.twitter.finagle.Http; +import org.junit.jupiter.api.extension.RegisterExtension; + +class ClientH1Test extends AbstractClientTest { + + @RegisterExtension + static final FinagleClientExtension CLIENT = + new FinagleClientExtension( + // see note on AbstractClientTest about http/2 + Http.Client::withNoHttp2); + + @Override + protected FinagleClientExtension clientExtension() { + return CLIENT; + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FinagleClientExtension.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FinagleClientExtension.java new file mode 100644 index 000000000000..7c1bd76db347 --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/FinagleClientExtension.java @@ -0,0 +1,120 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest.CONNECTION_TIMEOUT; +import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest.READ_TIMEOUT; + +import com.twitter.finagle.Http; +import com.twitter.finagle.Service; +import com.twitter.finagle.http.Request; +import com.twitter.finagle.http.Response; +import com.twitter.finagle.service.RetryBudget; +import com.twitter.util.Await; +import com.twitter.util.Duration; +import com.twitter.util.Time; +import io.netty.channel.EventLoopGroup; +import io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11.Utils.ClientType; +import java.net.URI; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.UnaryOperator; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +/** + * Class-scoped JUnit extension that owns a Finagle {@link Http.Client} per {@link ClientType} and + * the underlying {@link EventLoopGroup} for the duration of the test class. Clients and services + * are created lazily on first use and torn down once in {@code afterAll}. + */ +public class FinagleClientExtension implements AfterAllCallback { + + private final UnaryOperator configurer; + + private final Map clients = new ConcurrentHashMap<>(); + private final Map> services = new ConcurrentHashMap<>(); + + public FinagleClientExtension(UnaryOperator configurer) { + this.configurer = configurer; + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + for (Service svc : services.values()) { + Await.ready(svc.close(Time.fromSeconds(10))); + } + services.clear(); + clients.clear(); + } + + public Service getService(URI uri) { + return getService(uri, "https".equals(uri.getScheme()) ? ClientType.TLS : ClientType.DEFAULT); + } + + public Service getService(URI uri, ClientType type) { + String dest = uri.getHost() + ":" + Utils.safePort(uri); + ServiceKey key = new ServiceKey(dest, type); + // Build the client and bind the service under the root OTel context so no test-trace context + // gets captured in long-lived internal state. + return services.computeIfAbsent( + key, k -> Context.root().wrapSupplier(() -> client(type).newService(dest)).get()); + } + + private Http.Client client(ClientType type) { + return clients.computeIfAbsent(type, this::buildClient); + } + + private Http.Client buildClient(ClientType type) { + Http.Client client = + Http.client() + .withTransport() + .readTimeout(Duration.fromMilliseconds(READ_TIMEOUT.toMillis())) + .withTransport() + .connectTimeout(Duration.fromMilliseconds(CONNECTION_TIMEOUT.toMillis())) + .withRetryBudget(RetryBudget.Empty()); + + switch (type) { + case TLS: + client = client.withTransport().tlsWithoutValidation(); + break; + case SINGLE_CONN: + client = client.withSessionPool().maxSize(1); + break; + case DEFAULT: + break; + } + return configurer.apply(client); + } + + private static final class ServiceKey { + final String dest; + final ClientType type; + + ServiceKey(String dest, ClientType type) { + this.dest = dest; + this.type = type; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ServiceKey)) { + return false; + } + ServiceKey that = (ServiceKey) o; + return dest.equals(that.dest) && type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(dest, type); + } + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1ImmediateTest.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1ImmediateTest.java new file mode 100644 index 000000000000..517c556b4479 --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1ImmediateTest.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import com.twitter.finagle.Http; +import com.twitter.util.FuturePool; + +class ServerH1ImmediateTest extends ServerH1Test { + @Override + protected Http.Server configureServer(Http.Server in) { + return super.configureServer(in) + // ensures all work is single threaded (simple case) + .withExecutionOffloaded(FuturePool.immediatePool()); + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1OffloadedTest.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1OffloadedTest.java new file mode 100644 index 000000000000..e269714546eb --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1OffloadedTest.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import com.twitter.finagle.Http; +import com.twitter.util.FuturePool; + +class ServerH1OffloadedTest extends ServerH1Test { + @Override + protected Http.Server configureServer(Http.Server in) { + return super.configureServer(in) + // ensures all work is offloaded to a thread pool -- this is where problems can happen + .withExecutionOffloaded(FuturePool.unboundedPool()); + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1Test.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1Test.java index 2cc5e4d53cd0..c1b9d28e72c6 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1Test.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH1Test.java @@ -6,13 +6,10 @@ package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; import com.twitter.finagle.Http; -import com.twitter.finagle.ListeningServer; class ServerH1Test extends AbstractServerTest { @Override - protected ListeningServer setupServer() { - return Http.server() - .withNoHttp2() - .serve(address.getHost() + ":" + port, new AbstractServerTest.TestService()); + protected Http.Server configureServer(Http.Server in) { + return super.configureServer(in).withNoHttp2(); } } diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2ImmediateTest.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2ImmediateTest.java new file mode 100644 index 000000000000..b47de29cac10 --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2ImmediateTest.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import com.twitter.finagle.Http; +import com.twitter.util.FuturePool; + +class ServerH2ImmediateTest extends ServerH2Test { + + @Override + protected Http.Server configureServer(Http.Server in) { + return super.configureServer(in) + // ensures all work is single threaded (simple case) + .withExecutionOffloaded(FuturePool.immediatePool()); + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2OffloadedTest.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2OffloadedTest.java new file mode 100644 index 000000000000..4f9203d1ce93 --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2OffloadedTest.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import com.twitter.finagle.Http; +import com.twitter.util.FuturePool; + +class ServerH2OffloadedTest extends ServerH2Test { + + @Override + protected Http.Server configureServer(Http.Server in) { + return super.configureServer(in) + // ensures all work is offloaded to a thread pool -- this is where problems can happen + .withExecutionOffloaded(FuturePool.unboundedPool()); + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2Test.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2Test.java index cc4735eaa087..f7fd7907189e 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2Test.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/ServerH2Test.java @@ -12,13 +12,13 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import com.twitter.finagle.Http; -import com.twitter.finagle.ListeningServer; import com.twitter.finagle.Service; import com.twitter.finagle.http.Request; import com.twitter.finagle.http.Response; import com.twitter.finagle.http2.param.PriorKnowledge; import com.twitter.util.Await; import com.twitter.util.Duration; +import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolSpecificEvent; import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint; @@ -38,15 +38,14 @@ class ServerH2Test extends AbstractServerTest { @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); @Override - protected ListeningServer setupServer() { - return Http.server() + protected Http.Server configureServer(Http.Server in) { + return super.configureServer(in) // when enabled, supports protocol h1 & h2, the latter with upgrade .withHttp2() // todo implement http/2-specific tests // the armeria configuration used at the heart of AbstractHttpServerTest isn't configurable // to http/2 - .configured(PriorKnowledge.apply(true).mk()) - .serve(address.getHost() + ":" + port, new AbstractServerTest.TestService()); + .configured(PriorKnowledge.apply(true).mk()); } private static void assertSwitchingProtocolsEvent(EventDataAssert eventDataAssert) { @@ -57,6 +56,7 @@ private static void assertSwitchingProtocolsEvent(EventDataAssert eventDataAsser equalTo(ProtocolSpecificEvent.SWITCHING_PROTOCOLS_TO_KEY, singletonList("h2c"))); } + /* Bonus is that this implicitly tests both the server & client h2 upgrades. */ @Test void h2ProtocolUpgrade() throws Exception { URI uri = URI.create("http://localhost:" + port + SUCCESS.getPath()); @@ -68,17 +68,20 @@ void h2ProtocolUpgrade() throws Exception { cleanup.deferCleanup(client::close); Response response = - Await.result( - client.apply( - Utils.buildRequest( - "GET", - uri, - ImmutableMap.of( - HttpHeaderNames.USER_AGENT.toString(), - TEST_USER_AGENT, - HttpHeaderNames.X_FORWARDED_FOR.toString(), - TEST_CLIENT_IP))), - Duration.fromSeconds(20)); + testing.runWithSpan( + "h2-upgrade-client", + () -> + Await.result( + client.apply( + Utils.buildRequest( + "GET", + uri, + ImmutableMap.of( + HttpHeaderNames.USER_AGENT.toString(), + TEST_USER_AGENT, + HttpHeaderNames.X_FORWARDED_FOR.toString(), + TEST_CLIENT_IP))), + Duration.fromSeconds(20))); assertThat(response.status().code()).isEqualTo(SUCCESS.getStatus()); assertThat(response.contentString()).isEqualTo(SUCCESS.getBody()); @@ -89,19 +92,27 @@ void h2ProtocolUpgrade() throws Exception { testing.waitAndAssertTraces( trace -> { List> spanAssertions = new ArrayList<>(); + // client initiation spanAssertions.add( - s -> s.hasEventsSatisfyingExactly(ServerH2Test::assertSwitchingProtocolsEvent)); + s -> s.hasName("h2-upgrade-client").hasNoParent().hasKind(SpanKind.INTERNAL)); + // actual client netty span (including upgrade event) spanAssertions.add( - span -> { - assertServerSpan(span, method, endpoint, endpoint.getStatus()); - span.hasEventsSatisfyingExactly(ServerH2Test::assertSwitchingProtocolsEvent); - }); - - int parentIndex = 1; + s -> + s.hasKind(SpanKind.CLIENT) + .hasName(method) + .hasParent(trace.getSpan(0)) + .hasEventsSatisfyingExactly(ServerH2Test::assertSwitchingProtocolsEvent)); + // server netty span (including upgrade event) + spanAssertions.add( + span -> + assertServerSpan(span, method, endpoint, endpoint.getStatus()) + .hasParent(trace.getSpan(1)) + .hasEventsSatisfyingExactly(ServerH2Test::assertSwitchingProtocolsEvent)); + // server controller span spanAssertions.add( span -> { assertControllerSpan(span, null); - span.hasParent(trace.getSpan(parentIndex)); + span.hasParent(trace.getSpan(2)); }); trace.hasSpansSatisfyingExactly(spanAssertions); diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreInstrumentationTest.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreInstrumentationTest.java new file mode 100644 index 000000000000..4696ced22a0f --- /dev/null +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/TwitterUtilCoreInstrumentationTest.java @@ -0,0 +1,409 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.finaglehttp.v23_11; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; + +import com.twitter.util.Await; +import com.twitter.util.Duration; +import com.twitter.util.Future; +import com.twitter.util.FuturePool; +import com.twitter.util.FuturePool$; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import scala.Function0; +import scala.Function1; +import scala.runtime.AbstractFunction0; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +class TwitterUtilCoreInstrumentationTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + + private static final Duration AWAIT = Duration.fromSeconds(10); + + @Test + void constFutureRespondPropagatesContext() { + testing.runWithSpan( + "parent", + () -> { + Future.value("v") + .respond( + fn1( + t -> { + testing.runWithSpan("respond-child", () -> {}); + return BoxedUnit.UNIT; + })); + }); + assertParentAndChild("parent", "respond-child"); + } + + @Test + void constFutureTransformPropagatesContext() throws Exception { + Future result = + testing.runWithSpan( + "parent", + () -> + Future.value("v") + .transform( + fn1( + t -> { + testing.runWithSpan("transform-child", () -> {}); + return Future.value("transformed"); + }))); + assertThat(Await.result(result, AWAIT)).isEqualTo("transformed"); + assertParentAndChild("parent", "transform-child"); + } + + @Test + void constFutureExceptionRespondPropagatesContext() { + testing.runWithSpan( + "parent", + () -> { + Future.exception(new RuntimeException("boom")) + .respond( + fn1( + t -> { + testing.runWithSpan("respond-child", () -> {}); + return BoxedUnit.UNIT; + })); + }); + assertParentAndChild("parent", "respond-child"); + } + + @Test + void futurePoolSingleApplyPropagatesContext() throws Exception { + ExecutorService exec = Executors.newSingleThreadExecutor(named("single-pool")); + cleanup.deferCleanup(() -> shutdown(exec)); + FuturePool pool = FuturePool$.MODULE$.apply(exec); + + Future f = + testing.runWithSpan( + "parent", + () -> + pool.apply( + fn0( + () -> { + testing.runWithSpan("pool-child", () -> {}); + return 1; + }))); + + assertThat(Await.result(f, AWAIT)).isEqualTo(1); + assertParentAndChild("parent", "pool-child"); + } + + @Test + void futurePoolMultiLayerPropagatesContextAcrossPools() throws Exception { + ExecutorService exec1 = Executors.newSingleThreadExecutor(named("pool-1")); + ExecutorService exec2 = Executors.newSingleThreadExecutor(named("pool-2")); + ExecutorService exec3 = Executors.newFixedThreadPool(2, named("pool-3")); + cleanup.deferCleanup(() -> shutdown(exec1)); + cleanup.deferCleanup(() -> shutdown(exec2)); + cleanup.deferCleanup(() -> shutdown(exec3)); + + FuturePool pool1 = FuturePool$.MODULE$.apply(exec1); + FuturePool pool2 = FuturePool$.MODULE$.apply(exec2); + FuturePool pool3 = FuturePool$.MODULE$.apply(exec3); + + Set threadsSeen = ConcurrentHashMap.newKeySet(); + String callerThread = Thread.currentThread().getName(); + + Future chain = + testing.runWithSpan( + "parent", + () -> + pool1 + .apply( + fn0( + () -> { + threadsSeen.add(Thread.currentThread().getName()); + testing.runWithSpan("stage-1", () -> {}); + return "a"; + })) + // hop to pool2: the flatMap continuation runs on exec1 (completion thread) + // via the Promise$Transformer advice, and pool2.apply() re-captures context + // to dispatch the Function0 onto exec2 + .flatMap( + fn1( + a -> + pool2.apply( + fn0( + () -> { + threadsSeen.add(Thread.currentThread().getName()); + testing.runWithSpan("stage-2", () -> {}); + return a + "b"; + })))) + // hop to pool3 + .flatMap( + fn1( + ab -> + pool3.apply( + fn0( + () -> { + threadsSeen.add(Thread.currentThread().getName()); + testing.runWithSpan("stage-3", () -> {}); + return ab + "c"; + })))) + // terminal continuation — no FuturePool, Promise$Transformer only + .map( + fn1( + abc -> { + threadsSeen.add(Thread.currentThread().getName()); + testing.runWithSpan("stage-4-terminal", () -> {}); + return abc + "d"; + }))); + + assertThat(Await.result(chain, AWAIT)).isEqualTo("abcd"); + + // none of the stages should have run on the JUnit caller thread + assertThat(threadsSeen).noneMatch(n -> n.equals(callerThread)); + // each distinct pool must have actually serviced at least one stage + assertThat(threadsSeen).anyMatch(n -> n.startsWith("pool-1")); + assertThat(threadsSeen).anyMatch(n -> n.startsWith("pool-2")); + assertThat(threadsSeen).anyMatch(n -> n.startsWith("pool-3")); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("stage-1").hasParent(trace.getSpan(0)), + span -> span.hasName("stage-2").hasParent(trace.getSpan(0)), + span -> span.hasName("stage-3").hasParent(trace.getSpan(0)), + span -> span.hasName("stage-4-terminal").hasParent(trace.getSpan(0)))); + } + + @Test + void futurePoolInterleavedWithConstFuture() throws Exception { + ExecutorService exec1 = Executors.newSingleThreadExecutor(named("interleave-1")); + ExecutorService exec2 = Executors.newSingleThreadExecutor(named("interleave-2")); + cleanup.deferCleanup(() -> shutdown(exec1)); + cleanup.deferCleanup(() -> shutdown(exec2)); + + FuturePool pool1 = FuturePool$.MODULE$.apply(exec1); + FuturePool pool2 = FuturePool$.MODULE$.apply(exec2); + + Future chain = + testing.runWithSpan( + "parent", + () -> + pool1 + .apply( + fn0( + () -> { + testing.runWithSpan("pool1-stage", () -> {}); + return "a"; + })) + // ConstFuture.flatMap exercises FutureInstrumentation.transform + .flatMap( + fn1( + a -> + Future.value(a + "b") + .flatMap( + fn1( + ab -> { + testing.runWithSpan("const-stage", () -> {}); + return pool2.apply( + fn0( + () -> { + testing.runWithSpan( + "pool2-stage", () -> {}); + return ab + "c"; + })); + }))))); + + assertThat(Await.result(chain, AWAIT)).isEqualTo("abc"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("pool1-stage").hasParent(trace.getSpan(0)), + span -> span.hasName("const-stage").hasParent(trace.getSpan(0)), + span -> span.hasName("pool2-stage").hasParent(trace.getSpan(0)))); + } + + @Test + void futurePoolConcurrentChainsKeepContextsIsolated() throws Exception { + ExecutorService exec = Executors.newFixedThreadPool(4, named("iso-pool")); + cleanup.deferCleanup(() -> shutdown(exec)); + FuturePool pool = FuturePool$.MODULE$.apply(exec); + + int n = 8; + CountDownLatch gate = new CountDownLatch(1); + List> futures = new ArrayList<>(n); + + for (int i = 0; i < n; i++) { + int idx = i; + String parentName = "parent-" + idx; + String childName = "child-" + idx; + testing.runWithSpan( + parentName, + () -> { + Future f = + pool.apply( + fn0( + () -> { + try { + gate.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + testing.runWithSpan(childName, () -> {}); + return idx; + })); + futures.add(f); + }); + } + + // release all at once so they interleave on the pool threads + gate.countDown(); + for (Future f : futures) { + Await.result(f, AWAIT); + } + + // expect n traces, each with [parent-i, child-i] + List> assertions = new ArrayList<>(); + for (int i = 0; i < n; i++) { + int idx = i; + assertions.add( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent-" + idx).hasNoParent(), + span -> span.hasName("child-" + idx).hasParent(trace.getSpan(0)))); + } + testing.waitAndAssertTraces(assertions); + } + + @Test + void localSchedulerReentrantSubmitPropagatesContext() throws Exception { + // A respond callback re-scheduling work on the LocalScheduler forces + // Activation.submit(Runnable) to fire while already on a scheduler thread. + ExecutorService exec = Executors.newSingleThreadExecutor(named("ls-pool")); + cleanup.deferCleanup(() -> shutdown(exec)); + FuturePool pool = FuturePool$.MODULE$.apply(exec); + + CountDownLatch nestedDone = new CountDownLatch(1); + + testing.runWithSpan( + "parent", + () -> { + pool.apply( + fn0( + () -> { + // on exec thread; scheduler is active here + Future.value("inner") + .respond( + fn1( + t -> { + testing.runWithSpan("nested-child", () -> {}); + nestedDone.countDown(); + return BoxedUnit.UNIT; + })); + return BoxedUnit.UNIT; + })); + }); + + assertThat(nestedDone.await(5, SECONDS)).isTrue(); + assertParentAndChild("parent", "nested-child"); + } + + @Test + void immediateFuturePoolStillPropagatesContext() throws Exception { + FuturePool pool = FuturePool.immediatePool(); + + Future f = + testing.runWithSpan( + "parent", + () -> + pool.apply( + fn0( + () -> { + testing.runWithSpan("immediate-body", () -> {}); + return 1; + })) + .map( + fn1( + v -> { + testing.runWithSpan("immediate-map", () -> {}); + return v + 1; + }))); + + assertThat((int) Await.result(f, AWAIT)).isEqualTo(2); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> span.hasName("immediate-body").hasParent(trace.getSpan(0)), + span -> span.hasName("immediate-map").hasParent(trace.getSpan(0)))); + } + + private static void assertParentAndChild(String parentName, String childName) { + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName(parentName).hasNoParent(), + span -> span.hasName(childName).hasParent(trace.getSpan(0)))); + } + + private static Function0 fn0(Supplier s) { + return new AbstractFunction0() { + @Override + public T apply() { + return s.get(); + } + }; + } + + private static Function1 fn1(Function f) { + return new AbstractFunction1() { + @Override + public R apply(T t) { + return f.apply(t); + } + }; + } + + private static ThreadFactory named(String prefix) { + AtomicInteger n = new AtomicInteger(); + return r -> { + Thread t = new Thread(r, prefix + "-" + n.getAndIncrement()); + t.setDaemon(true); + return t; + }; + } + + private static void shutdown(ExecutorService exec) { + exec.shutdownNow(); + try { + exec.awaitTermination(5, SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Utils.java b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Utils.java index 1b0678eb2af1..2aa61fac19f0 100644 --- a/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Utils.java +++ b/instrumentation/finagle-http-23.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/finaglehttp/v23_11/Utils.java @@ -22,7 +22,6 @@ class Utils { static Http.Client createClient(ClientType clientType) { Http.Client client = Http.client() - .withNoHttp2() .withTransport() .readTimeout(Duration.fromMilliseconds(READ_TIMEOUT.toMillis())) .withTransport() diff --git a/settings.gradle.kts b/settings.gradle.kts index b4a3af5de8d1..31e991442d3d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -288,6 +288,7 @@ include(":instrumentation:executors:testing") include(":instrumentation:external-annotations:javaagent") include(":instrumentation:external-annotations:javaagent-unit-tests") include(":instrumentation:failsafe-3.0:library") +include(":instrumentation:finagle-http-23.11:compile-stub") include(":instrumentation:finagle-http-23.11:javaagent") include(":instrumentation:finatra-2.9:javaagent") include(":instrumentation:geode-1.4:javaagent") diff --git a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/util/ContextStorageCloser.java b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/util/ContextStorageCloser.java index 524901c82cf2..6d51c12a10da 100644 --- a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/util/ContextStorageCloser.java +++ b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/util/ContextStorageCloser.java @@ -60,7 +60,10 @@ boolean runWithRestore(AutoCloseable target) { } catch (Throwable t) { restore(); if (t instanceof AssertionError) { - System.err.println(); + System.err.println("AssertionError " + t); + for (StackTraceElement stackTraceElement : t.getStackTrace()) { + System.err.println("\t" + stackTraceElement); + } for (Map.Entry threadEntry : Thread.getAllStackTraces().entrySet()) { System.err.println("Thread " + threadEntry.getKey());