Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.jaxws.v2_0.metro.v2_2;

import static java.util.Objects.requireNonNull;
import static java.util.logging.Level.FINE;

import com.sun.xml.ws.api.message.Packet;
import io.opentelemetry.api.trace.Span;
Expand Down Expand Up @@ -158,14 +159,9 @@ private static String invokeSafely(MethodHandle methodHandle, Object httpServlet
// servlet path to the span name
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/10986
return null;
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable t) {
/*
* This is impossible, because the methods being invoked do not throw checked exceptions,
* and unchecked exceptions and errors are handled above
*/
throw new AssertionError(t);
logger.log(FINE, "Failed to get servlet path for jaxws metro span name", t);
return null;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -58,8 +59,8 @@ public static Timer onEnter() {
public static void onExit(
@Advice.Enter Timer timer,
@Advice.This Consumer<?, ?> consumer,
@Advice.Return ConsumerRecords<?, ?> records,
@Advice.Thrown Throwable error) {
@Advice.Return @Nullable ConsumerRecords<?, ?> records,
@Advice.Thrown @Nullable Throwable error) {

// don't create spans when no records were received
if (records == null || records.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class KafkaTestUtil {
private static Method getConsumerPollDurationMethod() {
try {
return Consumer.class.getMethod("poll", Duration.class);
} catch (NoSuchMethodException e) {
} catch (NoSuchMethodException ignored) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ final class KafkaBatchProcessSpanLinksExtractor implements SpanLinksExtractor<Ka

KafkaBatchProcessSpanLinksExtractor(TextMapPropagator propagator) {
this.singleRecordLinkExtractor =
new PropagatorBasedSpanLinksExtractor<>(propagator, KafkaConsumerRecordGetter.INSTANCE);
new PropagatorBasedSpanLinksExtractor<>(propagator, new KafkaConsumerRecordGetter());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
import javax.annotation.Nullable;
import org.apache.kafka.common.header.Header;

enum KafkaConsumerRecordGetter implements TextMapGetter<KafkaProcessRequest> {
INSTANCE;

class KafkaConsumerRecordGetter implements TextMapGetter<KafkaProcessRequest> {
@Override
public Iterable<String> keys(KafkaProcessRequest carrier) {
return StreamSupport.stream(carrier.getRecord().headers().spliterator(), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ public Instrumenter<KafkaProcessRequest, Void> createConsumerProcessInstrumenter
builder.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<>(
openTelemetry.getPropagators().getTextMapPropagator(),
KafkaConsumerRecordGetter.INSTANCE));
new KafkaConsumerRecordGetter()));
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
} else {
return builder.buildConsumerInstrumenter(KafkaConsumerRecordGetter.INSTANCE);
return builder.buildConsumerInstrumenter(new KafkaConsumerRecordGetter());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ protected static String getKafkaConnectUrl() {
kafkaConnect.getMappedPort(CONNECT_REST_PORT_INTERNAL));
}

protected static String getInternalKafkaBoostrapServers() {
protected static String getInternalKafkaBootstrapServers() {
return KAFKA_NETWORK_ALIAS + ":" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT;
}

protected static String getKafkaBoostrapServers() {
protected static String getKafkaBootstrapServers() {
return kafka.getHost() + ":" + kafkaExposedPort;
}

Expand Down Expand Up @@ -279,7 +279,7 @@ private void setupKafkaConnect() {
.withEnv(
"OTEL_SEMCONV_STABILITY_OPT_IN",
System.getProperty("otel.semconv-stability.opt-in"))
.withEnv("CONNECT_BOOTSTRAP_SERVERS", getInternalKafkaBoostrapServers())
.withEnv("CONNECT_BOOTSTRAP_SERVERS", getInternalKafkaBootstrapServers())
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", KAFKA_CONNECT_NETWORK_ALIAS)
.withEnv("CONNECT_PLUGIN_PATH", PLUGIN_PATH_CONTAINER)
.withEnv(
Expand Down Expand Up @@ -326,7 +326,7 @@ protected void awaitForTopicCreation(String topicName) {

protected AdminClient createAdminClient() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers());
return KafkaAdminClient.create(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void testSingleMessage() throws IOException {
awaitForTopicCreation(testTopicName);

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

Expand Down Expand Up @@ -180,7 +180,7 @@ void testMultiTopic() throws IOException {
awaitForTopicCreation(topicName3);

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10); // to send messages in one batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void testSingleMessage() throws IOException {
awaitForTopicCreation(testTopicName);

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

Expand Down Expand Up @@ -217,7 +217,7 @@ void testMultiTopic() throws IOException {
awaitForTopicCreation(topicName3);

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBoostrapServers());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10); // to send messages in one batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.KafkaStreamsSingletons.instrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.StateHolder.HOLDER;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.StateHolder.holder;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
Expand All @@ -18,6 +18,7 @@
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -44,13 +45,13 @@ public void transform(TypeTransformer transformer) {
public static class NextRecordAdvice {

@Advice.OnMethodExit(suppress = Throwable.class, inline = false)
public static void onExit(@Advice.Return StampedRecord record) {
public static void onExit(@Advice.Return @Nullable StampedRecord record) {
if (record == null) {
return;
}

StateHolder holder = HOLDER.get();
if (holder == null) {
StateHolder stateHolder = holder().get();
if (stateHolder == null) {
// somehow nextRecord() was called outside of process()
return;
}
Expand All @@ -66,7 +67,7 @@ public static void onExit(@Advice.Return StampedRecord record) {
return;
}
Context context = instrumenter().start(parentContext, request);
holder.set(request, context, context.makeCurrent());
stateHolder.set(request, context, context.makeCurrent());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -46,9 +47,10 @@ public static class DeserializeAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class, inline = false)
@Nullable
public static ConsumerRecord<?, ?> onExit(
@Advice.Argument(1) ConsumerRecord<?, ?> incoming,
@Advice.Return ConsumerRecord<?, ?> result) {
@Advice.Return @Nullable ConsumerRecord<?, ?> result) {
if (result == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.asm.Advice.AssignReturned;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -43,9 +44,10 @@ public static class SaveHeadersAdvice {

@AssignReturned.ToReturned
@Advice.OnMethodExit(suppress = Throwable.class, inline = false)
@Nullable
public static ConsumerRecord<?, ?> saveHeaders(
@Advice.Argument(0) ConsumerRecord<?, ?> incoming,
@Advice.Return ConsumerRecord<?, ?> result) {
@Advice.Return @Nullable ConsumerRecord<?, ?> result) {
if (result == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
import javax.annotation.Nullable;

public class StateHolder {
public static final ThreadLocal<StateHolder> HOLDER = new ThreadLocal<>();
private static final ThreadLocal<StateHolder> holder = new ThreadLocal<>();

@Nullable private KafkaProcessRequest request;
@Nullable private Context context;
@Nullable private Scope scope;

public static ThreadLocal<StateHolder> holder() {
return holder;
}

public void closeScope() {
scope.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11;

import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.KafkaStreamsSingletons.instrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.StateHolder.HOLDER;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.v0_11.StateHolder.holder;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -38,19 +39,23 @@ public static class ProcessAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
public static StateHolder onEnter() {
StateHolder holder = new StateHolder();
HOLDER.set(holder);
holder().set(holder);
return holder;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
public static void stopSpan(
@Advice.Enter StateHolder holder, @Advice.Thrown Throwable throwable) {
HOLDER.remove();
@Advice.Enter @Nullable StateHolder stateHolder,
@Advice.Thrown @Nullable Throwable throwable) {
holder().remove();
if (stateHolder == null) {
return;
}

Context context = holder.getContext();
Context context = stateHolder.getContext();
if (context != null) {
holder.closeScope();
instrumenter().end(context, holder.getRequest(), null, throwable);
stateHolder.closeScope();
instrumenter().end(context, stateHolder.getRequest(), null, throwable);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static boolean onEnter() {
return KafkaClientsConsumerProcessTracing.setWrappingEnabled(false);
}

@Advice.OnMethodExit(suppress = Throwable.class, inline = false)
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
public static void onExit(@Advice.Enter boolean previousValue) {
KafkaClientsConsumerProcessTracing.setWrappingEnabled(previousValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.time.Duration;
Expand All @@ -37,7 +38,6 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.wait.strategy.Wait;
Expand All @@ -49,6 +49,8 @@ abstract class KafkaStreamsBaseTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();

protected static final String STREAM_PENDING = "test.pending";
protected static final String STREAM_PROCESSED = "test.processed";

Expand All @@ -68,6 +70,7 @@ static void setup() throws ExecutionException, InterruptedException, TimeoutExce
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1));
kafka.start();
cleanup.deferAfterAll(kafka::stop);

// create test topic
try (AdminClient adminClient =
Expand All @@ -82,6 +85,7 @@ static void setup() throws ExecutionException, InterruptedException, TimeoutExce
}

producer = new KafkaProducer<>(producerProps(kafka.getBootstrapServers()));
cleanup.deferAfterAll(producer);

Map<String, Object> consumerProps =
ImmutableMap.of(
Expand All @@ -100,6 +104,7 @@ static void setup() throws ExecutionException, InterruptedException, TimeoutExce
"value.deserializer",
StringDeserializer.class);
consumer = new KafkaConsumer<>(consumerProps);
cleanup.deferAfterAll(consumer);
consumer.subscribe(
singleton(STREAM_PROCESSED),
new ConsumerRebalanceListener() {
Expand All @@ -113,13 +118,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> collection) {
});
}

@AfterAll
static void cleanup() {
consumer.close();
producer.close();
kafka.stop();
}

static Map<String, Object> producerProps(String servers) {
// values copied from spring's KafkaTestUtils
return ImmutableMap.of(
Expand Down
Loading