From 53497dc31376c7232ddce435549269d8a98c03b3 Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Thu, 30 Apr 2026 22:55:41 +0000 Subject: [PATCH 1/6] Review fixes for jaxws-2.0-metro-2.2:javaagent Automated code review of instrumentation/jaxws/jaxws-2.0-metro-2.2/javaagent. --- .../v2_0/metro/v2_2/MetroServerSpanNameUpdater.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/instrumentation/jaxws/jaxws-2.0-metro-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jaxws/v2_0/metro/v2_2/MetroServerSpanNameUpdater.java b/instrumentation/jaxws/jaxws-2.0-metro-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jaxws/v2_0/metro/v2_2/MetroServerSpanNameUpdater.java index cb7053f02086..a31a13f15c2a 100644 --- a/instrumentation/jaxws/jaxws-2.0-metro-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jaxws/v2_0/metro/v2_2/MetroServerSpanNameUpdater.java +++ b/instrumentation/jaxws/jaxws-2.0-metro-2.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jaxws/v2_0/metro/v2_2/MetroServerSpanNameUpdater.java @@ -6,6 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.jaxws.v2_0.metro.v2_2; import static java.util.Objects.requireNonNull; +import static java.util.logging.Level.FINE; import com.sun.xml.ws.api.message.Packet; import io.opentelemetry.api.trace.Span; @@ -158,14 +159,9 @@ private static String invokeSafely(MethodHandle methodHandle, Object httpServlet // servlet path to the span name // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/10986 return null; - } catch (RuntimeException | Error e) { - throw e; } catch (Throwable t) { - /* - * This is impossible, because the methods being invoked do not throw checked exceptions, - * and unchecked exceptions and errors are handled above - */ - throw new AssertionError(t); + logger.log(FINE, "Failed to get servlet path for jaxws metro span name", t); + return null; } } } From f08bd9bab7231a682784e4926589d24989bf4165 Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Thu, 30 Apr 2026 23:00:58 +0000 Subject: [PATCH 2/6] Review fixes for kafka-clients-0.11:javaagent Automated code review of instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent. --- .../kafkaclients/v0_11/KafkaConsumerInstrumentation.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java index 05f845d02b4e..fd4c5f5de016 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java @@ -22,6 +22,7 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.time.Duration; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -58,8 +59,8 @@ public static Timer onEnter() { public static void onExit( @Advice.Enter Timer timer, @Advice.This Consumer consumer, - @Advice.Return ConsumerRecords records, - @Advice.Thrown Throwable error) { + @Advice.Return @Nullable ConsumerRecords records, + @Advice.Thrown @Nullable Throwable error) { // don't create spans when no records were received if (records == null || records.isEmpty()) { From e8a3a7a248ce754f297d1f76ca31b9aabc3b98df Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Thu, 30 Apr 2026 23:08:24 +0000 Subject: [PATCH 3/6] Review fixes for kafka-clients-0.11:testing Automated code review of instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing. --- .../kafkaclients/common/v0_11/internal/KafkaTestUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaTestUtil.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaTestUtil.java index ad622acb3ae7..06d659b972fc 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaTestUtil.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaTestUtil.java @@ -16,7 +16,7 @@ class KafkaTestUtil { private static Method getConsumerPollDurationMethod() { try { return Consumer.class.getMethod("poll", Duration.class); - } catch (NoSuchMethodException e) { + } catch (NoSuchMethodException ignored) { return null; } } From 611d6fc1ade168b4a83adf2faf872c61d028e868 Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Thu, 30 Apr 2026 23:18:56 +0000 Subject: [PATCH 4/6] Review fixes for kafka-clients-common-0.11:library Automated code review of instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library. --- .../v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java | 2 +- .../common/v0_11/internal/KafkaConsumerRecordGetter.java | 4 +--- .../common/v0_11/internal/KafkaInstrumenterFactory.java | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java index b3422f21f916..a9f13d9e8e4d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaBatchProcessSpanLinksExtractor.java @@ -18,7 +18,7 @@ final class KafkaBatchProcessSpanLinksExtractor implements SpanLinksExtractor(propagator, KafkaConsumerRecordGetter.INSTANCE); + new PropagatorBasedSpanLinksExtractor<>(propagator, new KafkaConsumerRecordGetter()); } @Override diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java index a3866824e050..6b96311861da 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaConsumerRecordGetter.java @@ -15,9 +15,7 @@ import javax.annotation.Nullable; import org.apache.kafka.common.header.Header; -enum KafkaConsumerRecordGetter implements TextMapGetter { - INSTANCE; - +class KafkaConsumerRecordGetter implements TextMapGetter { @Override public Iterable keys(KafkaProcessRequest carrier) { return StreamSupport.stream(carrier.getRecord().headers().spliterator(), false) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java index 45e4d0991ebf..7f2e88998da9 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java @@ -143,10 +143,10 @@ public Instrumenter createConsumerProcessInstrumenter builder.addSpanLinksExtractor( new PropagatorBasedSpanLinksExtractor<>( openTelemetry.getPropagators().getTextMapPropagator(), - KafkaConsumerRecordGetter.INSTANCE)); + new KafkaConsumerRecordGetter())); return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } else { - return builder.buildConsumerInstrumenter(KafkaConsumerRecordGetter.INSTANCE); + return builder.buildConsumerInstrumenter(new KafkaConsumerRecordGetter()); } } From b0ad503315bb7560e887b78e87ba7cd3d5ce948b Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Thu, 30 Apr 2026 23:33:43 +0000 Subject: [PATCH 5/6] Review fixes for kafka-connect-2.6:testing Automated code review of instrumentation/kafka/kafka-connect-2.6/testing. --- .../kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java | 8 ++++---- .../kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java | 4 ++-- .../v2_6/PostgresKafkaConnectSinkTaskTest.java | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java index bb2ab47033fa..87df35dfbf2c 100644 --- a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java +++ b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java @@ -122,11 +122,11 @@ protected static String getKafkaConnectUrl() { kafkaConnect.getMappedPort(CONNECT_REST_PORT_INTERNAL)); } - protected static String getInternalKafkaBoostrapServers() { + protected static String getInternalKafkaBootstrapServers() { return KAFKA_NETWORK_ALIAS + ":" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT; } - protected static String getKafkaBoostrapServers() { + protected static String getKafkaBootstrapServers() { return kafka.getHost() + ":" + kafkaExposedPort; } @@ -279,7 +279,7 @@ private void setupKafkaConnect() { .withEnv( "OTEL_SEMCONV_STABILITY_OPT_IN", System.getProperty("otel.semconv-stability.opt-in")) - .withEnv("CONNECT_BOOTSTRAP_SERVERS", getInternalKafkaBoostrapServers()) + .withEnv("CONNECT_BOOTSTRAP_SERVERS", getInternalKafkaBootstrapServers()) .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", KAFKA_CONNECT_NETWORK_ALIAS) .withEnv("CONNECT_PLUGIN_PATH", PLUGIN_PATH_CONTAINER) .withEnv( @@ -326,7 +326,7 @@ protected void awaitForTopicCreation(String topicName) { protected AdminClient createAdminClient() { Properties properties = new Properties(); - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers()); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers()); return KafkaAdminClient.create(properties); } diff --git a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java index 56d2185728fe..05ac11565c1a 100644 --- a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java +++ b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/MongoKafkaConnectSinkTaskTest.java @@ -106,7 +106,7 @@ void testSingleMessage() throws IOException { awaitForTopicCreation(testTopicName); Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -180,7 +180,7 @@ void testMultiTopic() throws IOException { awaitForTopicCreation(topicName3); Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10); // to send messages in one batch diff --git a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java index ced890ce09d8..843fffda5ccd 100644 --- a/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java +++ b/instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java @@ -121,7 +121,7 @@ void testSingleMessage() throws IOException { awaitForTopicCreation(testTopicName); Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -217,7 +217,7 @@ void testMultiTopic() throws IOException { awaitForTopicCreation(topicName3); Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10); // to send messages in one batch From 7054282087c388ded0c22960767b8214af0dcd33 Mon Sep 17 00:00:00 2001 From: otelbot <197425009+otelbot@users.noreply.github.com> Date: Thu, 30 Apr 2026 23:41:49 +0000 Subject: [PATCH 6/6] Review fixes for kafka-streams-0.11:javaagent Automated code review of instrumentation/kafka/kafka-streams-0.11/javaagent. --- .../v0_11/PartitionGroupInstrumentation.java | 11 ++++++----- .../RecordDeserializerInstrumentation.java | 4 +++- ...NodeRecordDeserializerInstrumentation.java | 4 +++- .../kafkastreams/v0_11/StateHolder.java | 6 +++++- .../v0_11/StreamTaskInstrumentation.java | 19 ++++++++++++------- .../v0_11/StreamThreadInstrumentation.java | 2 +- .../v0_11/KafkaStreamsBaseTest.java | 14 ++++++-------- 7 files changed, 36 insertions(+), 24 deletions(-) diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/PartitionGroupInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/PartitionGroupInstrumentation.java index 05ac9fc0d9a5..912a8cd6d8c5 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/PartitionGroupInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/PartitionGroupInstrumentation.java @@ -7,7 +7,7 @@ import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.KafkaStreamsSingletons.instrumenter; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.StateHolder.HOLDER; +import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.StateHolder.holder; import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; @@ -18,6 +18,7 @@ import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -44,13 +45,13 @@ public void transform(TypeTransformer transformer) { public static class NextRecordAdvice { @Advice.OnMethodExit(suppress = Throwable.class, inline = false) - public static void onExit(@Advice.Return StampedRecord record) { + public static void onExit(@Advice.Return @Nullable StampedRecord record) { if (record == null) { return; } - StateHolder holder = HOLDER.get(); - if (holder == null) { + StateHolder stateHolder = holder().get(); + if (stateHolder == null) { // somehow nextRecord() was called outside of process() return; } @@ -66,7 +67,7 @@ public static void onExit(@Advice.Return StampedRecord record) { return; } Context context = instrumenter().start(parentContext, request); - holder.set(request, context, context.makeCurrent()); + stateHolder.set(request, context, context.makeCurrent()); } } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/RecordDeserializerInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/RecordDeserializerInstrumentation.java index e07aadd7d242..b9470a504b67 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/RecordDeserializerInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/RecordDeserializerInstrumentation.java @@ -15,6 +15,7 @@ import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; @@ -46,9 +47,10 @@ public static class DeserializeAdvice { @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + @Nullable public static ConsumerRecord onExit( @Advice.Argument(1) ConsumerRecord incoming, - @Advice.Return ConsumerRecord result) { + @Advice.Return @Nullable ConsumerRecord result) { if (result == null) { return null; } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/SourceNodeRecordDeserializerInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/SourceNodeRecordDeserializerInstrumentation.java index bfa76016745d..1f2f8651f0c0 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/SourceNodeRecordDeserializerInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/SourceNodeRecordDeserializerInstrumentation.java @@ -13,6 +13,7 @@ import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice.AssignReturned; import net.bytebuddy.description.type.TypeDescription; @@ -43,9 +44,10 @@ public static class SaveHeadersAdvice { @AssignReturned.ToReturned @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + @Nullable public static ConsumerRecord saveHeaders( @Advice.Argument(0) ConsumerRecord incoming, - @Advice.Return ConsumerRecord result) { + @Advice.Return @Nullable ConsumerRecord result) { if (result == null) { return null; } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StateHolder.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StateHolder.java index 6920f52e9d05..1052344a2680 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StateHolder.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StateHolder.java @@ -11,12 +11,16 @@ import javax.annotation.Nullable; public class StateHolder { - public static final ThreadLocal HOLDER = new ThreadLocal<>(); + private static final ThreadLocal holder = new ThreadLocal<>(); @Nullable private KafkaProcessRequest request; @Nullable private Context context; @Nullable private Scope scope; + public static ThreadLocal holder() { + return holder; + } + public void closeScope() { scope.close(); } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StreamTaskInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StreamTaskInstrumentation.java index 681b35953839..446d83061b6a 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StreamTaskInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StreamTaskInstrumentation.java @@ -6,13 +6,14 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.KafkaStreamsSingletons.instrumenter; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.StateHolder.HOLDER; +import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.StateHolder.holder; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import javax.annotation.Nullable; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -38,19 +39,23 @@ public static class ProcessAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) public static StateHolder onEnter() { StateHolder holder = new StateHolder(); - HOLDER.set(holder); + holder().set(holder); return holder; } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) public static void stopSpan( - @Advice.Enter StateHolder holder, @Advice.Thrown Throwable throwable) { - HOLDER.remove(); + @Advice.Enter @Nullable StateHolder stateHolder, + @Advice.Thrown @Nullable Throwable throwable) { + holder().remove(); + if (stateHolder == null) { + return; + } - Context context = holder.getContext(); + Context context = stateHolder.getContext(); if (context != null) { - holder.closeScope(); - instrumenter().end(context, holder.getRequest(), null, throwable); + stateHolder.closeScope(); + instrumenter().end(context, stateHolder.getRequest(), null, throwable); } } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StreamThreadInstrumentation.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StreamThreadInstrumentation.java index db34fa5a0eaf..2c79151002eb 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StreamThreadInstrumentation.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/StreamThreadInstrumentation.java @@ -34,7 +34,7 @@ public static boolean onEnter() { return KafkaClientsConsumerProcessTracing.setWrappingEnabled(false); } - @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) public static void onExit(@Advice.Enter boolean previousValue) { KafkaClientsConsumerProcessTracing.setWrappingEnabled(previousValue); } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/KafkaStreamsBaseTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/KafkaStreamsBaseTest.java index 8b99654d0c18..04446089258f 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/KafkaStreamsBaseTest.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/v0_11/KafkaStreamsBaseTest.java @@ -15,6 +15,7 @@ import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import java.time.Duration; @@ -37,7 +38,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.containers.wait.strategy.Wait; @@ -49,6 +49,8 @@ abstract class KafkaStreamsBaseTest { @RegisterExtension static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + @RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create(); + protected static final String STREAM_PENDING = "test.pending"; protected static final String STREAM_PROCESSED = "test.processed"; @@ -68,6 +70,7 @@ static void setup() throws ExecutionException, InterruptedException, TimeoutExce .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1)) .withStartupTimeout(Duration.ofMinutes(1)); kafka.start(); + cleanup.deferAfterAll(kafka::stop); // create test topic try (AdminClient adminClient = @@ -82,6 +85,7 @@ static void setup() throws ExecutionException, InterruptedException, TimeoutExce } producer = new KafkaProducer<>(producerProps(kafka.getBootstrapServers())); + cleanup.deferAfterAll(producer); Map consumerProps = ImmutableMap.of( @@ -100,6 +104,7 @@ static void setup() throws ExecutionException, InterruptedException, TimeoutExce "value.deserializer", StringDeserializer.class); consumer = new KafkaConsumer<>(consumerProps); + cleanup.deferAfterAll(consumer); consumer.subscribe( singleton(STREAM_PROCESSED), new ConsumerRebalanceListener() { @@ -113,13 +118,6 @@ public void onPartitionsAssigned(Collection collection) { }); } - @AfterAll - static void cleanup() { - consumer.close(); - producer.close(); - kafka.stop(); - } - static Map producerProps(String servers) { // values copied from spring's KafkaTestUtils return ImmutableMap.of(