From 1ca95892ab0c9a283c4a798fed0df4c279d56237 Mon Sep 17 00:00:00 2001 From: wpessers Date: Sat, 4 Apr 2026 19:11:09 +0100 Subject: [PATCH 1/5] feat(kafka-clients): handle readonly record headers on context injection through producer wrapper --- .../v2_6/internal/KafkaProducerTelemetry.java | 7 ++++- .../v2_6/ExceptionHandlingTest.java | 29 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java index c70b543d3d77..73d4a90418e9 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java @@ -93,7 +93,12 @@ public Future buildAndInjectSpan( Context context = producerInstrumenter.start(parentContext, request); if (producerPropagationEnabled) { - propagator.inject(context, record.headers(), SETTER); + try { + propagator.inject(context, record.headers(), SETTER); + } catch (Throwable t) { + // it can happen if headers are read only (when record is sent second time) + logger.log(WARNING, "failed to inject span context. sending record second time?", t); + } } try (Scope ignored = context.makeCurrent()) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java index ca9b376a7322..6b542b2610b0 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java @@ -5,14 +5,18 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; import java.lang.reflect.Proxy; import java.time.Duration; +import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -57,4 +61,29 @@ void testProducerExceptionPropagatesToCaller() { .isInstanceOf(IllegalStateException.class) .hasMessage("can't invoke"); } + + @Test + @SuppressWarnings({"unchecked"}) + void testProducerHandlesReadOnlyHeaders() { + Producer producer = + (Producer) + Proxy.newProxyInstance( + ExceptionHandlingTest.class.getClassLoader(), + new Class[] {Producer.class}, + (proxy, method, args) -> { + if ("send".equals(method.getName())) { + return CompletableFuture.completedFuture(null); + } + throw new IllegalStateException("can't invoke"); + }); + KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build(); + Producer wrappedProducer = telemetry.wrap(producer); + + ProducerRecord record = + new ProducerRecord<>( + "test-topic", null, null, "test-key", "test-value", new RecordHeaders()); + ((RecordHeaders) record.headers()).setReadOnly(); + assertThatNoException() + .isThrownBy(() -> testing.runWithSpan("parent", () -> wrappedProducer.send(record))); + } } From eb17d42877cee7a242262d58119c5ca2c89e3b7c Mon Sep 17 00:00:00 2001 From: wpessers Date: Tue, 7 Apr 2026 22:08:57 +0200 Subject: [PATCH 2/5] refactor: reuse existing retry logic from KafkaPropagation.java --- .../v2_6/internal/KafkaProducerTelemetry.java | 36 ++++++------------- .../OpenTelemetryProducerInterceptor.java | 2 +- .../v0_11/internal/KafkaPropagation.java | 18 ++++++---- 3 files changed, 24 insertions(+), 32 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java index 73d4a90418e9..13c117ab242a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java @@ -5,23 +5,18 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal; -import static java.util.logging.Level.WARNING; - import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest; +import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaPropagation; import java.util.concurrent.Future; import java.util.function.BiFunction; -import java.util.logging.Logger; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.header.Headers; /** * Helper for producer-side instrumentation. @@ -30,10 +25,6 @@ * at any time. */ public class KafkaProducerTelemetry { - private static final Logger logger = Logger.getLogger(KafkaProducerTelemetry.class.getName()); - - private static final TextMapSetter SETTER = KafkaHeadersSetter.INSTANCE; - private final TextMapPropagator propagator; private final Instrumenter producerInstrumenter; private final boolean producerPropagationEnabled; @@ -52,24 +43,24 @@ public KafkaProducerTelemetry( * * @param record the producer record to inject span info. */ - public void buildAndInjectSpan(ProducerRecord record, String clientId) { + public ProducerRecord buildAndInjectSpan( + ProducerRecord record, String clientId) { Context parentContext = Context.current(); KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); if (!producerInstrumenter.shouldStart(parentContext, request)) { - return; + return record; } Context context = producerInstrumenter.start(parentContext, request); - if (producerPropagationEnabled) { - try { - propagator.inject(context, record.headers(), SETTER); - } catch (Throwable t) { - // it can happen if headers are read only (when record is sent second time) - logger.log(WARNING, "failed to inject span context. sending record second time?", t); + try { + if (producerPropagationEnabled) { + record = KafkaPropagation.propagateContext(propagator, context, record); } + } finally { + producerInstrumenter.end(context, request, null, null); } - producerInstrumenter.end(context, request, null, null); + return record; } /** @@ -93,12 +84,7 @@ public Future buildAndInjectSpan( Context context = producerInstrumenter.start(parentContext, request); if (producerPropagationEnabled) { - try { - propagator.inject(context, record.headers(), SETTER); - } catch (Throwable t) { - // it can happen if headers are read only (when record is sent second time) - logger.log(WARNING, "failed to inject span context. sending record second time?", t); - } + record = KafkaPropagation.propagateContext(propagator, context, record); } try (Scope ignored = context.makeCurrent()) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java index 0e8fa05cad5a..17673d363767 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/OpenTelemetryProducerInterceptor.java @@ -32,7 +32,7 @@ public class OpenTelemetryProducerInterceptor implements ProducerIntercept @CanIgnoreReturnValue public ProducerRecord onSend(ProducerRecord producerRecord) { if (producerTelemetry != null) { - producerTelemetry.buildAndInjectSpan(producerRecord, clientId); + return producerTelemetry.buildAndInjectSpan(producerRecord, clientId); } return producerRecord; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaPropagation.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaPropagation.java index 0c389a447252..b184b726fc71 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaPropagation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaPropagation.java @@ -7,6 +7,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.ProducerRecord; @@ -50,8 +51,14 @@ private static boolean hasMaxUsableProduceMagic() { public static ProducerRecord propagateContext( Context context, ProducerRecord record) { + return propagateContext( + GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), context, record); + } + + public static ProducerRecord propagateContext( + TextMapPropagator propagator, Context context, ProducerRecord record) { try { - inject(context, record); + inject(propagator, context, record); } catch (IllegalStateException e) { // headers must be read-only from reused record. try again with new one. record = @@ -63,15 +70,14 @@ record = record.value(), record.headers()); - inject(context, record); + inject(propagator, context, record); } return record; } - private static void inject(Context context, ProducerRecord record) { - GlobalOpenTelemetry.getPropagators() - .getTextMapPropagator() - .inject(context, record.headers(), SETTER); + private static void inject( + TextMapPropagator propagator, Context context, ProducerRecord record) { + propagator.inject(context, record.headers(), SETTER); } private KafkaPropagation() {} From 70209964acfff2ec171fc8ef5608e06c88faf154 Mon Sep 17 00:00:00 2001 From: wpessers Date: Wed, 8 Apr 2026 16:11:18 +0200 Subject: [PATCH 3/5] feat(kafka-clients): end span on synchronous failures --- .../v2_6/internal/KafkaProducerTelemetry.java | 3 + .../v2_6/WrapperSendExceptionTest.java | 55 +++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java index 13c117ab242a..ea4a57b6c84b 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/internal/KafkaProducerTelemetry.java @@ -89,6 +89,9 @@ record = KafkaPropagation.propagateContext(propagator, context, record); try (Scope ignored = context.makeCurrent()) { return sendFn.apply(record, new ProducerCallback(callback, parentContext, context, request)); + } catch (Throwable t) { + producerInstrumenter.end(context, request, null, t); + throw t; } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java new file mode 100644 index 000000000000..831bf1ca950f --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java @@ -0,0 +1,55 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients.v2_6; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.sdk.trace.data.StatusData; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class WrapperSendExceptionTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Test + @SuppressWarnings({"unchecked"}) + void producerSpanEndedWhenSendThrowsSynchronously() { + Producer producer = mock(Producer.class); + when(producer.send(any(), any())).thenThrow(new KafkaException("send failed")); + + KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build(); + Producer wrappedProducer = telemetry.wrap(producer); + + ProducerRecord record = + new ProducerRecord<>("test-topic", "test-key", "test-value"); + + assertThatThrownBy( + () -> testing.runWithSpan("parent", () -> wrappedProducer.send(record))) + .isInstanceOf(KafkaException.class) + .hasMessage("send failed"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("test-topic publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()))); + } +} From 9b48929a258366cc63ec3f97388224a737c7aed7 Mon Sep 17 00:00:00 2001 From: wpessers Date: Wed, 8 Apr 2026 16:17:09 +0200 Subject: [PATCH 4/5] use mock in ExceptionHandlingTest.java --- .../v2_6/ExceptionHandlingTest.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java index 6b542b2610b0..2a03929411dc 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/ExceptionHandlingTest.java @@ -7,6 +7,9 @@ import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; @@ -65,17 +68,9 @@ void testProducerExceptionPropagatesToCaller() { @Test @SuppressWarnings({"unchecked"}) void testProducerHandlesReadOnlyHeaders() { - Producer producer = - (Producer) - Proxy.newProxyInstance( - ExceptionHandlingTest.class.getClassLoader(), - new Class[] {Producer.class}, - (proxy, method, args) -> { - if ("send".equals(method.getName())) { - return CompletableFuture.completedFuture(null); - } - throw new IllegalStateException("can't invoke"); - }); + Producer producer = mock(Producer.class); + when(producer.send(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build(); Producer wrappedProducer = telemetry.wrap(producer); From 0492746b39fbcae5f2cbd802e5ddd921d41bbc37 Mon Sep 17 00:00:00 2001 From: wpessers Date: Wed, 8 Apr 2026 16:18:25 +0200 Subject: [PATCH 5/5] format WrapperSendExceptionTest.java --- .../kafkaclients/v2_6/WrapperSendExceptionTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java index 831bf1ca950f..fc5371ee7990 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperSendExceptionTest.java @@ -37,8 +37,7 @@ void producerSpanEndedWhenSendThrowsSynchronously() { ProducerRecord record = new ProducerRecord<>("test-topic", "test-key", "test-value"); - assertThatThrownBy( - () -> testing.runWithSpan("parent", () -> wrappedProducer.send(record))) + assertThatThrownBy(() -> testing.runWithSpan("parent", () -> wrappedProducer.send(record))) .isInstanceOf(KafkaException.class) .hasMessage("send failed");