Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,18 @@

package io.opentelemetry.instrumentation.kafkaclients.v2_6.internal;

import static java.util.logging.Level.WARNING;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaPropagation;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.logging.Logger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;

/**
* Helper for producer-side instrumentation.
Expand All @@ -30,10 +25,6 @@
* at any time.
*/
public class KafkaProducerTelemetry {
private static final Logger logger = Logger.getLogger(KafkaProducerTelemetry.class.getName());

private static final TextMapSetter<Headers> SETTER = KafkaHeadersSetter.INSTANCE;

private final TextMapPropagator propagator;
private final Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter;
private final boolean producerPropagationEnabled;
Expand All @@ -52,24 +43,24 @@ public KafkaProducerTelemetry(
*
* @param record the producer record to inject span info.
*/
public <K, V> void buildAndInjectSpan(ProducerRecord<K, V> record, String clientId) {
public <K, V> ProducerRecord<K, V> buildAndInjectSpan(
ProducerRecord<K, V> record, String clientId) {
Context parentContext = Context.current();

KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
if (!producerInstrumenter.shouldStart(parentContext, request)) {
return;
return record;
}

Context context = producerInstrumenter.start(parentContext, request);
if (producerPropagationEnabled) {
try {
propagator.inject(context, record.headers(), SETTER);
} catch (Throwable t) {
// it can happen if headers are read only (when record is sent second time)
logger.log(WARNING, "failed to inject span context. sending record second time?", t);
try {
if (producerPropagationEnabled) {
record = KafkaPropagation.propagateContext(propagator, context, record);
}
} finally {
producerInstrumenter.end(context, request, null, null);
}
producerInstrumenter.end(context, request, null, null);
return record;
}

/**
Expand All @@ -93,11 +84,14 @@ public <K, V> Future<RecordMetadata> buildAndInjectSpan(

Context context = producerInstrumenter.start(parentContext, request);
if (producerPropagationEnabled) {
propagator.inject(context, record.headers(), SETTER);
record = KafkaPropagation.propagateContext(propagator, context, record);
}

try (Scope ignored = context.makeCurrent()) {
return sendFn.apply(record, new ProducerCallback(callback, parentContext, context, request));
} catch (Throwable t) {
producerInstrumenter.end(context, request, null, t);
throw t;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class OpenTelemetryProducerInterceptor<K, V> implements ProducerIntercept
@CanIgnoreReturnValue
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
if (producerTelemetry != null) {
producerTelemetry.buildAndInjectSpan(producerRecord, clientId);
return producerTelemetry.buildAndInjectSpan(producerRecord, clientId);
}
return producerRecord;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@

package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -57,4 +64,21 @@ void testProducerExceptionPropagatesToCaller() {
.isInstanceOf(IllegalStateException.class)
.hasMessage("can't invoke");
}

@Test
@SuppressWarnings({"unchecked"})
void testProducerHandlesReadOnlyHeaders() {
Producer<String, String> producer = mock(Producer.class);
when(producer.send(any(), any())).thenReturn(CompletableFuture.completedFuture(null));

KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build();
Producer<String, String> wrappedProducer = telemetry.wrap(producer);

ProducerRecord<String, String> record =
new ProducerRecord<>(
"test-topic", null, null, "test-key", "test-value", new RecordHeaders());
((RecordHeaders) record.headers()).setReadOnly();
assertThatNoException()
.isThrownBy(() -> testing.runWithSpan("parent", () -> wrappedProducer.send(record)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.v2_6;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.sdk.trace.data.StatusData;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class WrapperSendExceptionTest {

@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();

@Test
@SuppressWarnings({"unchecked"})
void producerSpanEndedWhenSendThrowsSynchronously() {
Producer<String, String> producer = mock(Producer.class);
when(producer.send(any(), any())).thenThrow(new KafkaException("send failed"));

KafkaTelemetry telemetry = KafkaTelemetry.builder(testing.getOpenTelemetry()).build();
Producer<String, String> wrappedProducer = telemetry.wrap(producer);

ProducerRecord<String, String> record =
new ProducerRecord<>("test-topic", "test-key", "test-value");

assertThatThrownBy(() -> testing.runWithSpan("parent", () -> wrappedProducer.send(record)))
.isInstanceOf(KafkaException.class)
.hasMessage("send failed");

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName("test-topic publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -50,8 +51,14 @@ private static boolean hasMaxUsableProduceMagic() {

public static <K, V> ProducerRecord<K, V> propagateContext(
Context context, ProducerRecord<K, V> record) {
return propagateContext(
GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), context, record);
}

public static <K, V> ProducerRecord<K, V> propagateContext(
TextMapPropagator propagator, Context context, ProducerRecord<K, V> record) {
try {
inject(context, record);
inject(propagator, context, record);
} catch (IllegalStateException e) {
// headers must be read-only from reused record. try again with new one.
record =
Expand All @@ -63,15 +70,14 @@ record =
record.value(),
record.headers());

inject(context, record);
inject(propagator, context, record);
}
return record;
}

private static <K, V> void inject(Context context, ProducerRecord<K, V> record) {
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(context, record.headers(), SETTER);
private static <K, V> void inject(
TextMapPropagator propagator, Context context, ProducerRecord<K, V> record) {
propagator.inject(context, record.headers(), SETTER);
}

private KafkaPropagation() {}
Expand Down
Loading