diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java index c70b543d3d77..ea4a57b6c84b 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java @@ -5,23 +5,18 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; -import static java.util.logging.Level.WARNING; - import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaPropagation; import java.util.concurrent.Future; import java.util.function.BiFunction; -import java.util.logging.Logger; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.header.Headers; /** * Helper for producer-side instrumentation. @@ -30,10 +25,6 @@ * at any time. */ public class KafkaProducerTelemetry { - private static final Logger logger = Logger.getLogger(KafkaProducerTelemetry.class.getName()); - - private static final TextMapSetter SETTER = KafkaHeadersSetter.INSTANCE; - private final TextMapPropagator propagator; private final Instrumenter producerInstrumenter; private final boolean producerPropagationEnabled; @@ -52,24 +43,24 @@ public KafkaProducerTelemetry( * * @param record the producer record to inject span info. */ - public void buildAndInjectSpan(ProducerRecord record, String clientId) { + public ProducerRecord buildAndInjectSpan( + ProducerRecord record, String clientId) { Context parentContext = Context.current(); KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); if (!producerInstrumenter.shouldStart(parentContext, request)) { - return; + return record; } Context context = producerInstrumenter.start(parentContext, request); - if (producerPropagationEnabled) { - try { - propagator.inject(context, record.headers(), SETTER); - } catch (Throwable t) { - // it can happen if headers are read only (when record is sent second time) - logger.log(WARNING, "failed to inject span context. sending record second time?", t); + try { + if (producerPropagationEnabled) { + record = KafkaPropagation.propagateContext(propagator, context, record); } + } finally { + producerInstrumenter.end(context, request, null, null); } - producerInstrumenter.end(context, request, null, null); + return record; } /** @@ -93,11 +84,14 @@ public Future buildAndInjectSpan( Context context = producerInstrumenter.start(parentContext, request); if (producerPropagationEnabled) { - propagator.inject(context, record.headers(), SETTER); + record = KafkaPropagation.propagateContext(propagator, context, record); } try (Scope ignored = context.makeCurrent()) { return sendFn.apply(record, new ProducerCallback(callback, parentContext, context, request)); + } catch (Throwable t) { + producerInstrumenter.end(context, request, null, t); + throw t; } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java index 0e8fa05cad5a..17673d363767 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java @@ -32,7 +32,7 @@ public class OpenTelemetryProducerInterceptor implements ProducerIntercept @CanIgnoreReturnValue public ProducerRecord onSend(ProducerRecord producerRecord) { if (producerTelemetry != null) { - producerTelemetry.buildAndInjectSpan(producerRecord, clientId); + return producerTelemetry.buildAndInjectSpan(producerRecord, clientId); } return producerRecord; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java index ca9b376a7322..2a03929411dc 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java @@ -5,14 +5,21 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; import java.lang.reflect.Proxy; import java.time.Duration; +import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -57,4 +64,21 @@ void testProducerExceptionPropagatesToCaller() { .isInstanceOf(IllegalStateException.class) .hasMessage("can't invoke"); } + + @Test + @SuppressWarnings({"unchecked"}) + void testProducerHandlesReadOnlyHeaders() { + Producer producer = mock(Producer.class); + when(producer.send(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + + KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build(); + Producer wrappedProducer = telemetry.wrap(producer); + + ProducerRecord record = + new ProducerRecord<>( + "test-topic", null, null, "test-key", "test-value", new RecordHeaders()); + ((RecordHeaders) record.headers()).setReadOnly(); + assertThatNoException() + .isThrownBy(() -> testing.runWithSpan("parent", () -> wrappedProducer.send(record))); + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java new file mode 100644 index 000000000000..fc5371ee7990 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.sdk.trace.data.StatusData; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class WrapperSendExceptionTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Test + @SuppressWarnings({"unchecked"}) + void producerSpanEndedWhenSendThrowsSynchronously() { + Producer producer = mock(Producer.class); + when(producer.send(any(), any())).thenThrow(new KafkaException("send failed")); + + KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build(); + Producer wrappedProducer = telemetry.wrap(producer); + + ProducerRecord record = + new ProducerRecord<>("test-topic", "test-key", "test-value"); + + assertThatThrownBy(() -> testing.runWithSpan("parent", () -> wrappedProducer.send(record))) + .isInstanceOf(KafkaException.class) + .hasMessage("send failed"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("test-topic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()))); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaPropagation.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaPropagation.java index 0c389a447252..b184b726fc71 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaPropagation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaPropagation.java @@ -7,6 +7,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.ProducerRecord; @@ -50,8 +51,14 @@ private static boolean hasMaxUsableProduceMagic() { public static ProducerRecord propagateContext( Context context, ProducerRecord record) { + return propagateContext( + GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), context, record); + } + + public static ProducerRecord propagateContext( + TextMapPropagator propagator, Context context, ProducerRecord record) { try { - inject(context, record); + inject(propagator, context, record); } catch (IllegalStateException e) { // headers must be read-only from reused record. try again with new one. record = @@ -63,15 +70,14 @@ record = record.value(), record.headers()); - inject(context, record); + inject(propagator, context, record); } return record; } - private static void inject(Context context, ProducerRecord record) { - GlobalOpenTelemetry.getPropagators() - .getTextMapPropagator() - .inject(context, record.headers(), SETTER); + private static void inject( + TextMapPropagator propagator, Context context, ProducerRecord record) { + propagator.inject(context, record.headers(), SETTER); } private KafkaPropagation() {}