From 9094090e6c87118aed0505aeb90c3a1360fee8e5 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Wed, 3 Jun 2026 11:29:57 -0700 Subject: [PATCH 1/5] Add messaging exception event extractor helpers --- .../MessagingExceptionEventExtractors.java | 93 +++++++++++++++++++ .../common/v1_1/JmsInstrumenterFactory.java | 16 +++- .../internal/KafkaInstrumenterFactory.java | 49 +++++----- .../v2_6/KafkaConnectSingletons.java | 23 +++-- .../internal/NatsInstrumenterFactory.java | 25 +++-- .../v2_8/telemetry/PulsarSingletons.java | 28 ++++-- .../rabbitmq/v2_7/RabbitSingletons.java | 55 ++++++----- .../v4_8/RocketMqInstrumenterFactory.java | 4 + .../v5_0/RocketMqInstrumenterFactory.java | 7 ++ .../SpringIntegrationTelemetryBuilder.java | 44 +++++---- .../pulsar/v1_0/SpringPulsarSingletons.java | 3 + .../rabbit/v1_0/SpringRabbitSingletons.java | 19 ++-- 12 files changed, 260 insertions(+), 106 deletions(-) create mode 100644 instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractors.java diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractors.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractors.java new file mode 100644 index 000000000000..ab8885c5a1f7 --- /dev/null +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractors.java @@ -0,0 +1,93 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.internal.Experimental; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class MessagingExceptionEventExtractors { + + /** + * Configures the messaging create exception event name and severity. Only takes effect when + * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} + * flag. + */ + @CanIgnoreReturnValue + public static + InstrumenterBuilder setMessagingCreateExceptionEventExtractor( + InstrumenterBuilder builder) { + return setExceptionEventExtractor(builder, "messaging.create.exception", Severity.WARN); + } + + /** + * Configures the messaging send exception event name and severity. Only takes effect when + * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} + * flag. + */ + @CanIgnoreReturnValue + public static + InstrumenterBuilder setMessagingSendExceptionEventExtractor( + InstrumenterBuilder builder) { + return setExceptionEventExtractor(builder, "messaging.send.exception", Severity.WARN); + } + + /** + * Configures the messaging receive exception event name and severity. Only takes effect when + * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} + * flag. + */ + @CanIgnoreReturnValue + public static + InstrumenterBuilder setMessagingReceiveExceptionEventExtractor( + InstrumenterBuilder builder) { + return setExceptionEventExtractor(builder, "messaging.receive.exception", Severity.WARN); + } + + /** + * Configures the messaging settle exception event name and severity. Only takes effect when + * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} + * flag. + */ + @CanIgnoreReturnValue + public static + InstrumenterBuilder setMessagingSettleExceptionEventExtractor( + InstrumenterBuilder builder) { + return setExceptionEventExtractor(builder, "messaging.settle.exception", Severity.WARN); + } + + /** + * Configures the messaging process exception event name and severity. Only takes effect when + * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} + * flag. + */ + @CanIgnoreReturnValue + public static + InstrumenterBuilder setMessagingProcessExceptionEventExtractor( + InstrumenterBuilder builder) { + return setExceptionEventExtractor(builder, "messaging.process.exception", Severity.ERROR); + } + + @CanIgnoreReturnValue + private static + InstrumenterBuilder setExceptionEventExtractor( + InstrumenterBuilder builder, String eventName, Severity severity) { + Experimental.setExceptionEventExtractor( + builder, + (logRecordBuilder, context, request) -> { + logRecordBuilder.setEventName(eventName); + logRecordBuilder.setSeverity(severity); + }); + return builder; + } + + private MessagingExceptionEventExtractors() {} +} diff --git a/instrumentation/jms/jms-common-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/common/v1_1/JmsInstrumenterFactory.java b/instrumentation/jms/jms-common-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/common/v1_1/JmsInstrumenterFactory.java index cb20cbd09843..9255dafe3391 100644 --- a/instrumentation/jms/jms-common-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/common/v1_1/JmsInstrumenterFactory.java +++ b/instrumentation/jms/jms-common-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/common/v1_1/JmsInstrumenterFactory.java @@ -5,6 +5,9 @@ package io.opentelemetry.javaagent.instrumentation.jms.common.v1_1; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingReceiveExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor; import static java.util.Collections.emptyList; import com.google.errorprone.annotations.CanIgnoreReturnValue; @@ -50,11 +53,12 @@ public Instrumenter createProducerInstrumenter() { JmsMessageAttributesGetter getter = new JmsMessageAttributesGetter(); MessageOperation operation = MessageOperation.PUBLISH; - return Instrumenter.builder( - openTelemetry, - instrumentationName, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(createMessagingAttributesExtractor(operation)) + return setMessagingSendExceptionEventExtractor( + Instrumenter.builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor(createMessagingAttributesExtractor(operation))) .buildProducerInstrumenter(new MessagePropertySetter()); } @@ -68,6 +72,7 @@ public Instrumenter createConsumerReceiveInstrumen instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor(createMessagingAttributesExtractor(operation)); + setMessagingReceiveExceptionEventExtractor(builder); if (messagingReceiveInstrumentationEnabled) { builder.addSpanLinksExtractor( new PropagatorBasedSpanLinksExtractor<>( @@ -88,6 +93,7 @@ public Instrumenter createConsumerProcessInstrumen instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor(createMessagingAttributesExtractor(operation)); + setMessagingProcessExceptionEventExtractor(builder); if (canHaveReceiveInstrumentation && messagingReceiveInstrumentationEnabled) { builder.addSpanLinksExtractor( new PropagatorBasedSpanLinksExtractor<>( diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java index 7f2e88998da9..b80212c9fe2c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java @@ -5,6 +5,9 @@ package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingReceiveExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor; import static java.util.Collections.emptyList; import com.google.errorprone.annotations.CanIgnoreReturnValue; @@ -91,6 +94,7 @@ public Instrumenter createProducerInstrume if (captureExperimentalSpanAttributes) { builder.addAttributesExtractor(new KafkaProducerExperimentalAttributesExtractor()); } + setMessagingSendExceptionEventExtractor(builder); return builder.buildInstrumenter(SpanKindExtractor.alwaysProducer()); } @@ -103,16 +107,17 @@ public Instrumenter createConsumerReceiveInstrumenter KafkaReceiveAttributesGetter getter = new KafkaReceiveAttributesGetter(); MessageOperation operation = MessageOperation.RECEIVE; - return Instrumenter.builder( - openTelemetry, - instrumentationName, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor( - buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) - .addAttributesExtractor(new KafkaReceiveAttributesExtractor()) - .addAttributesExtractors(extractors) - .setErrorCauseExtractor(errorCauseExtractor) - .setEnabled(messagingReceiveInstrumentationEnabled) + return setMessagingReceiveExceptionEventExtractor( + Instrumenter.builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) + .addAttributesExtractor(new KafkaReceiveAttributesExtractor()) + .addAttributesExtractors(extractors) + .setErrorCauseExtractor(errorCauseExtractor) + .setEnabled(messagingReceiveInstrumentationEnabled)) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } @@ -138,6 +143,7 @@ public Instrumenter createConsumerProcessInstrumenter if (captureExperimentalSpanAttributes) { builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor()); } + setMessagingProcessExceptionEventExtractor(builder); if (messagingReceiveInstrumentationEnabled) { builder.addSpanLinksExtractor( @@ -154,17 +160,18 @@ public Instrumenter createBatchProcessInstrumenter() KafkaReceiveAttributesGetter getter = new KafkaReceiveAttributesGetter(); MessageOperation operation = MessageOperation.PROCESS; - return Instrumenter.builder( - openTelemetry, - instrumentationName, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor( - buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) - .addAttributesExtractor(new KafkaReceiveAttributesExtractor()) - .addSpanLinksExtractor( - new KafkaBatchProcessSpanLinksExtractor( - openTelemetry.getPropagators().getTextMapPropagator())) - .setErrorCauseExtractor(errorCauseExtractor) + return setMessagingProcessExceptionEventExtractor( + Instrumenter.builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) + .addAttributesExtractor(new KafkaReceiveAttributesExtractor()) + .addSpanLinksExtractor( + new KafkaBatchProcessSpanLinksExtractor( + openTelemetry.getPropagators().getTextMapPropagator())) + .setErrorCauseExtractor(errorCauseExtractor)) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java index 1d348e4b0b9e..82de93078984 100644 --- a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java +++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java @@ -5,6 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; + import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; @@ -26,16 +28,17 @@ public class KafkaConnectSingletons { new KafkaConnectBatchProcessSpanLinksExtractor(propagator); instrumenter = - Instrumenter.builder( - GlobalOpenTelemetry.get(), - INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create( - new KafkaConnectAttributesGetter(), MessageOperation.PROCESS)) - .addAttributesExtractor( - MessagingAttributesExtractor.builder( - new KafkaConnectAttributesGetter(), MessageOperation.PROCESS) - .build()) - .addSpanLinksExtractor(spanLinksExtractor) + setMessagingProcessExceptionEventExtractor( + Instrumenter.builder( + GlobalOpenTelemetry.get(), + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create( + new KafkaConnectAttributesGetter(), MessageOperation.PROCESS)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder( + new KafkaConnectAttributesGetter(), MessageOperation.PROCESS) + .build()) + .addSpanLinksExtractor(spanLinksExtractor)) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java index 4037dd6c5e6b..e6665c0ee081 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java @@ -5,6 +5,9 @@ package io.opentelemetry.instrumentation.nats.v2_17.internal; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor; + import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; @@ -22,16 +25,17 @@ public final class NatsInstrumenterFactory { public static Instrumenter createProducerInstrumenter( OpenTelemetry openTelemetry, List capturedHeaders) { - return Instrumenter.builder( - openTelemetry, - INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create( - new NatsRequestMessagingAttributesGetter(), MessageOperation.PUBLISH)) - .addAttributesExtractor( - MessagingAttributesExtractor.builder( - new NatsRequestMessagingAttributesGetter(), MessageOperation.PUBLISH) - .setCapturedHeaders(capturedHeaders) - .build()) + return setMessagingSendExceptionEventExtractor( + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create( + new NatsRequestMessagingAttributesGetter(), MessageOperation.PUBLISH)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder( + new NatsRequestMessagingAttributesGetter(), MessageOperation.PUBLISH) + .setCapturedHeaders(capturedHeaders) + .build())) .buildProducerInstrumenter(new NatsRequestTextMapSetter()); } @@ -48,6 +52,7 @@ public static Instrumenter createConsumerProcessInstrumenter( new NatsRequestMessagingAttributesGetter(), MessageOperation.PROCESS) .setCapturedHeaders(capturedHeaders) .build()); + setMessagingProcessExceptionEventExtractor(builder); return builder.buildConsumerInstrumenter(new NatsRequestTextMapGetter()); } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 0a93492e2ac9..0416da653655 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -5,6 +5,10 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingReceiveExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor; + import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.context.Context; @@ -77,6 +81,7 @@ private static Instrumenter createConsumerReceiveInstrument .addOperationMetrics(MessagingConsumerMetrics.get()) .addAttributesExtractor( ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())); + setMessagingReceiveExceptionEventExtractor(instrumenterBuilder); if (receiveInstrumentationEnabled) { return instrumenterBuilder @@ -91,16 +96,17 @@ private static Instrumenter createConsumerBatchReceive MessagingAttributesGetter getter = new PulsarBatchMessagingAttributesGetter(); - return Instrumenter.builder( - telemetry, - INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE)) - .addAttributesExtractor( - createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE)) - .addAttributesExtractor( - ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) - .addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(propagator)) - .addOperationMetrics(MessagingConsumerMetrics.get()) + return setMessagingReceiveExceptionEventExtractor( + Instrumenter.builder( + telemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE)) + .addAttributesExtractor( + createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE)) + .addAttributesExtractor( + ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) + .addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(propagator)) + .addOperationMetrics(MessagingConsumerMetrics.get())) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } @@ -114,6 +120,7 @@ private static Instrumenter createConsumerProcessInstrument MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS)) .addAttributesExtractor( createMessagingAttributesExtractor(getter, MessageOperation.PROCESS)); + setMessagingProcessExceptionEventExtractor(instrumenterBuilder); if (receiveInstrumentationEnabled) { SpanLinksExtractor spanLinksExtractor = @@ -142,6 +149,7 @@ private static Instrumenter createProducerInstrumenter() { .getBoolean("experimental_span_attributes/development", false)) { builder.addAttributesExtractor(new ExperimentalProducerAttributesExtractor()); } + setMessagingSendExceptionEventExtractor(builder); return builder.buildProducerInstrumenter(new MessageTextMapSetter()); } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/v2_7/RabbitSingletons.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/v2_7/RabbitSingletons.java index dbd0e4c188ea..f87221a5b2be 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/v2_7/RabbitSingletons.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/v2_7/RabbitSingletons.java @@ -7,6 +7,9 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT; import static io.opentelemetry.api.trace.SpanKind.PRODUCER; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingReceiveExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor; import com.rabbitmq.client.GetResponse; import io.opentelemetry.api.GlobalOpenTelemetry; @@ -16,6 +19,7 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesExtractor; @@ -54,17 +58,22 @@ static Instrumenter deliverInstrumenter() { } private static Instrumenter createChannelInstrumenter(boolean publish) { - return Instrumenter.builder( - GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, ChannelAndMethod::getMethod) - .addAttributesExtractor( - buildMessagingAttributesExtractor( - new RabbitChannelAttributesGetter(), publish ? MessageOperation.PUBLISH : null)) - .addAttributesExtractor( - NetworkAttributesExtractor.create(new RabbitChannelNetAttributesGetter())) - .addContextCustomizer( - (context, request, startAttributes) -> - context.with(CHANNEL_AND_METHOD_CONTEXT_KEY, new RabbitChannelAndMethodHolder())) - .buildInstrumenter(channelAndMethod -> publish ? PRODUCER : CLIENT); + InstrumenterBuilder builder = + Instrumenter.builder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, ChannelAndMethod::getMethod) + .addAttributesExtractor( + buildMessagingAttributesExtractor( + new RabbitChannelAttributesGetter(), publish ? MessageOperation.PUBLISH : null)) + .addAttributesExtractor( + NetworkAttributesExtractor.create(new RabbitChannelNetAttributesGetter())) + .addContextCustomizer( + (context, request, startAttributes) -> + context.with( + CHANNEL_AND_METHOD_CONTEXT_KEY, new RabbitChannelAndMethodHolder())); + if (publish) { + setMessagingSendExceptionEventExtractor(builder); + } + return builder.buildInstrumenter(channelAndMethod -> publish ? PRODUCER : CLIENT); } private static Instrumenter createReceiveInstrumenter() { @@ -77,14 +86,15 @@ private static Instrumenter createReceiveInstrument extractors.add(new RabbitReceiveExperimentalAttributesExtractor()); } - return Instrumenter.builder( - GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, ReceiveRequest::spanName) - .addAttributesExtractors(extractors) - .setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) - .addSpanLinksExtractor( - new PropagatorBasedSpanLinksExtractor<>( - GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), - new ReceiveRequestTextMapGetter())) + return setMessagingReceiveExceptionEventExtractor( + Instrumenter.builder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, ReceiveRequest::spanName) + .addAttributesExtractors(extractors) + .setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) + .addSpanLinksExtractor( + new PropagatorBasedSpanLinksExtractor<>( + GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), + new ReceiveRequestTextMapGetter()))) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } @@ -99,9 +109,10 @@ private static Instrumenter createDeliverInstrumenter() { extractors.add(new RabbitDeliveryExperimentalAttributesExtractor()); } - return Instrumenter.builder( - GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, DeliveryRequest::spanName) - .addAttributesExtractors(extractors) + return setMessagingProcessExceptionEventExtractor( + Instrumenter.builder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, DeliveryRequest::spanName) + .addAttributesExtractors(extractors)) .buildConsumerInstrumenter(new DeliveryRequestGetter()); } diff --git a/instrumentation/rocketmq/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqInstrumenterFactory.java index ec5c425a0749..ef611cf66212 100644 --- a/instrumentation/rocketmq/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqInstrumenterFactory.java +++ b/instrumentation/rocketmq/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqInstrumenterFactory.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.rocketmqclient.v4_8; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor; import static io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor.constant; import io.opentelemetry.api.OpenTelemetry; @@ -52,6 +54,7 @@ static Instrumenter createProducerInstrumenter( instrumenterBuilder.addAttributesExtractor( new RocketMqProducerExperimentalAttributeExtractor()); } + setMessagingSendExceptionEventExtractor(instrumenterBuilder); return instrumenterBuilder.buildProducerInstrumenter(new MapSetter()); } @@ -95,6 +98,7 @@ private static Instrumenter createProcessInstrumenter( if (captureExperimentalSpanAttributes) { builder.addAttributesExtractor(new RocketMqConsumerExperimentalAttributeExtractor()); } + setMessagingProcessExceptionEventExtractor(builder); if (batch) { SpanLinksExtractor spanLinksExtractor = diff --git a/instrumentation/rocketmq/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java index cd7edba46fc3..bb6ce5d1d276 100644 --- a/instrumentation/rocketmq/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java +++ b/instrumentation/rocketmq/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java @@ -5,6 +5,10 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingReceiveExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor; + import apache.rocketmq.v2.ReceiveMessageRequest; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.StatusCode; @@ -45,6 +49,7 @@ public static Instrumenter createProduce MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor(attributesExtractor) .addAttributesExtractor(new RocketMqProducerAttributeExtractor()); + setMessagingSendExceptionEventExtractor(instrumenterBuilder); return instrumenterBuilder.buildProducerInstrumenter(new MessageMapSetter()); } @@ -65,6 +70,7 @@ public static Instrumenter createProduce .setEnabled(enabled) .addAttributesExtractor(attributesExtractor) .addAttributesExtractor(new RocketMqConsumerReceiveAttributeExtractor()); + setMessagingReceiveExceptionEventExtractor(instrumenterBuilder); return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } @@ -94,6 +100,7 @@ public static Instrumenter createConsumerProcessInst .extract(spanStatusBuilder, messageView, consumeResult, error); } }); + setMessagingProcessExceptionEventExtractor(instrumenterBuilder); if (receiveInstrumentationEnabled) { SpanLinksExtractor spanLinksExtractor = diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryBuilder.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryBuilder.java index e5ec2d41cb93..31dcf102aa56 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryBuilder.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryBuilder.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.spring.integration.v4_1; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor; import static java.util.Collections.emptyList; import com.google.errorprone.annotations.CanIgnoreReturnValue; @@ -73,29 +75,31 @@ public SpringIntegrationTelemetryBuilder setProducerSpanEnabled(boolean producer */ public SpringIntegrationTelemetry build() { Instrumenter consumerInstrumenter = - Instrumenter.builder( - openTelemetry, - INSTRUMENTATION_NAME, - SpringIntegrationTelemetryBuilder::consumerSpanName) - .addAttributesExtractors(additionalAttributeExtractors) - .addAttributesExtractor( - buildMessagingAttributesExtractor( - new SpringMessagingAttributesGetter(), - MessageOperation.PROCESS, - capturedHeaders)) + setMessagingProcessExceptionEventExtractor( + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + SpringIntegrationTelemetryBuilder::consumerSpanName) + .addAttributesExtractors(additionalAttributeExtractors) + .addAttributesExtractor( + buildMessagingAttributesExtractor( + new SpringMessagingAttributesGetter(), + MessageOperation.PROCESS, + capturedHeaders))) .buildConsumerInstrumenter(MessageHeadersGetter.INSTANCE); Instrumenter producerInstrumenter = - Instrumenter.builder( - openTelemetry, - INSTRUMENTATION_NAME, - SpringIntegrationTelemetryBuilder::producerSpanName) - .addAttributesExtractors(additionalAttributeExtractors) - .addAttributesExtractor( - buildMessagingAttributesExtractor( - new SpringMessagingAttributesGetter(), - MessageOperation.PUBLISH, - capturedHeaders)) + setMessagingSendExceptionEventExtractor( + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + SpringIntegrationTelemetryBuilder::producerSpanName) + .addAttributesExtractors(additionalAttributeExtractors) + .addAttributesExtractor( + buildMessagingAttributesExtractor( + new SpringMessagingAttributesGetter(), + MessageOperation.PUBLISH, + capturedHeaders))) .buildInstrumenter(SpanKindExtractor.alwaysProducer()); return new SpringIntegrationTelemetry( openTelemetry.getPropagators(), diff --git a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java index 6edbb84a1a98..1f9db736d418 100644 --- a/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java +++ b/instrumentation/spring/spring-pulsar-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_0/SpringPulsarSingletons.java @@ -5,6 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; + import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; @@ -37,6 +39,7 @@ public class SpringPulsarSingletons { MessagingAttributesExtractor.builder(getter, operation) .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .build()); + setMessagingProcessExceptionEventExtractor(builder); if (messagingReceiveInstrumentationEnabled) { builder.addSpanLinksExtractor( new PropagatorBasedSpanLinksExtractor<>( diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/SpringRabbitSingletons.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/SpringRabbitSingletons.java index 6abdf47f7348..81b20ead0cf2 100644 --- a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/SpringRabbitSingletons.java +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/SpringRabbitSingletons.java @@ -5,6 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.spring.rabbit.v1_0; +import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor; + import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; @@ -24,14 +26,15 @@ public class SpringRabbitSingletons { MessageOperation operation = MessageOperation.PROCESS; instrumenter = - Instrumenter.builder( - GlobalOpenTelemetry.get(), - INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor( - MessagingAttributesExtractor.builder(getter, operation) - .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) - .build()) + setMessagingProcessExceptionEventExtractor( + Instrumenter.builder( + GlobalOpenTelemetry.get(), + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .build())) .buildConsumerInstrumenter(new MessageHeaderGetter()); } From 006ce3b47408357d3cfa402579c19294db0495d3 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Wed, 3 Jun 2026 17:28:02 -0700 Subject: [PATCH 2/5] Add unit and integration tests for messaging exception event extractors --- ...MessagingExceptionEventExtractorsTest.java | 100 ++++++++++++++++++ .../common/v1_1/JmsInstrumenterFactory.java | 15 +-- .../library/build.gradle.kts | 17 ++- .../kafkaclients/v2_6/WrapperTest.java | 54 ++++++++-- .../internal/KafkaInstrumenterFactory.java | 52 ++++----- .../v2_6/KafkaConnectSingletons.java | 28 ++--- .../internal/NatsInstrumenterFactory.java | 25 ++--- .../v2_8/telemetry/PulsarSingletons.java | 25 ++--- .../rabbitmq/v2_7/RabbitSingletons.java | 32 +++--- .../SpringIntegrationTelemetryBuilder.java | 51 ++++----- .../rabbit/v1_0/SpringRabbitSingletons.java | 24 +++-- 11 files changed, 293 insertions(+), 130 deletions(-) create mode 100644 instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractorsTest.java diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractorsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractorsTest.java new file mode 100644 index 000000000000..cdc87e665f80 --- /dev/null +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractorsTest.java @@ -0,0 +1,100 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal; + +import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsLogs; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_MESSAGE; +import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_STACKTRACE; +import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_TYPE; + +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; +import java.util.List; +import java.util.function.Consumer; +import org.assertj.core.api.AbstractAssert; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class MessagingExceptionEventExtractorsTest { + + @RegisterExtension + static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create(); + + @Test + void messagingCreateExceptionLog() { + assertExceptionLog( + MessagingExceptionEventExtractors::setMessagingCreateExceptionEventExtractor, + "messaging.create.exception", + Severity.WARN); + } + + @Test + void messagingSendExceptionLog() { + assertExceptionLog( + MessagingExceptionEventExtractors::setMessagingSendExceptionEventExtractor, + "messaging.send.exception", + Severity.WARN); + } + + @Test + void messagingReceiveExceptionLog() { + assertExceptionLog( + MessagingExceptionEventExtractors::setMessagingReceiveExceptionEventExtractor, + "messaging.receive.exception", + Severity.WARN); + } + + @Test + void messagingSettleExceptionLog() { + assertExceptionLog( + MessagingExceptionEventExtractors::setMessagingSettleExceptionEventExtractor, + "messaging.settle.exception", + Severity.WARN); + } + + @Test + void messagingProcessExceptionLog() { + assertExceptionLog( + MessagingExceptionEventExtractors::setMessagingProcessExceptionEventExtractor, + "messaging.process.exception", + Severity.ERROR); + } + + private static void assertExceptionLog( + Consumer> configure, + String expectedEventName, + Severity expectedSeverity) { + InstrumenterBuilder builder = + Instrumenter.builder(otelTesting.getOpenTelemetry(), "test", unused -> "span"); + configure.accept(builder); + Instrumenter instrumenter = builder.buildInstrumenter(); + + Context context = instrumenter.start(Context.root(), "request"); + IllegalStateException error = new IllegalStateException("test"); + instrumenter.end(context, "request", "response", error); + + List logs = otelTesting.getLogRecords(); + if (emitExceptionAsLogs()) { + assertThat(logs).hasSize(1); + assertThat(logs.get(0)) + .hasSeverity(expectedSeverity) + .hasEventName(expectedEventName) + .hasAttributesSatisfyingExactly( + equalTo(EXCEPTION_TYPE, "java.lang.IllegalStateException"), + equalTo(EXCEPTION_MESSAGE, "test"), + satisfies(EXCEPTION_STACKTRACE, AbstractAssert::isNotNull)); + } else { + assertThat(logs).isEmpty(); + } + } +} diff --git a/instrumentation/jms/jms-common-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/common/v1_1/JmsInstrumenterFactory.java b/instrumentation/jms/jms-common-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/common/v1_1/JmsInstrumenterFactory.java index 9255dafe3391..21cd1f4b198b 100644 --- a/instrumentation/jms/jms-common-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/common/v1_1/JmsInstrumenterFactory.java +++ b/instrumentation/jms/jms-common-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/common/v1_1/JmsInstrumenterFactory.java @@ -53,13 +53,14 @@ public Instrumenter createProducerInstrumenter() { JmsMessageAttributesGetter getter = new JmsMessageAttributesGetter(); MessageOperation operation = MessageOperation.PUBLISH; - return setMessagingSendExceptionEventExtractor( - Instrumenter.builder( - openTelemetry, - instrumentationName, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(createMessagingAttributesExtractor(operation))) - .buildProducerInstrumenter(new MessagePropertySetter()); + InstrumenterBuilder builder = + Instrumenter.builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor(createMessagingAttributesExtractor(operation)); + setMessagingSendExceptionEventExtractor(builder); + return builder.buildProducerInstrumenter(new MessagePropertySetter()); } public Instrumenter createConsumerReceiveInstrumenter() { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts index 31d518aed909..05c370e0914d 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts @@ -17,11 +17,26 @@ dependencies { } tasks { - test { + withType().configureEach { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) systemProperty("testLatestDeps", otelProps.testLatestDeps) systemProperty("collectMetadata", otelProps.collectMetadata) } + + val testExceptionSignalLogs by registering(Test::class) { + testClassesDirs = sourceSets.test.get().output.classesDirs + classpath = sourceSets.test.get().runtimeClasspath + + filter { + includeTestsMatching("WrapperTest") + } + jvmArgs("-Dotel.semconv.exception.signal.preview=logs") + systemProperty("metadataConfig", "otel.semconv.exception.signal.preview=logs") + } + + check { + dependsOn(testExceptionSignalLogs) + } } // kafka 4.1 requires java 11 diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java index f77656fb6bfe..01cc602b3082 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java @@ -7,8 +7,13 @@ import static io.opentelemetry.api.common.AttributeKey.longKey; import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsLogs; +import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsSpanEvents; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_STACKTRACE; +import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_TYPE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; @@ -20,14 +25,17 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.opentelemetry.api.logs.Severity; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.message.MessageHeaderUtil; +import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.StatusData; @@ -38,6 +46,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.assertj.core.api.AbstractLongAssert; import org.assertj.core.api.AbstractStringAssert; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; @SuppressWarnings("deprecation") // using deprecated semconv @@ -177,15 +186,40 @@ void testConsumerError() { testing.waitAndAssertTraces( trace -> trace.hasSpansSatisfyingExactly( - span -> - span.hasName("unknown receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasStatus(StatusData.error()) - .hasException(new IllegalStateException()) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_SYSTEM, "kafka"), - equalTo(MESSAGING_OPERATION, "receive"), - equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0)))); + span -> { + span.hasName("unknown receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_OPERATION, "receive"), + equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0)); + if (emitExceptionAsSpanEvents()) { + span.hasException(new IllegalStateException()); + } + })); + + if (emitExceptionAsLogs()) { + assertReceiveExceptionLog(); + } + } + + private static void assertReceiveExceptionLog() { + Awaitility.await() + .untilAsserted( + () -> { + List logs = + testing.logRecords().stream() + .filter(log -> "messaging.receive.exception".equals(log.getEventName())) + .collect(toList()); + assertThat(logs).hasSize(1); + assertThat(logs.get(0)) + .hasSeverity(Severity.WARN) + .hasEventName("messaging.receive.exception") + .hasAttributesSatisfyingExactly( + satisfies(EXCEPTION_TYPE, val -> val.isNotNull()), + satisfies(EXCEPTION_STACKTRACE, val -> val.isNotNull())); + }); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java index b80212c9fe2c..3a0654f4b34c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaInstrumenterFactory.java @@ -107,18 +107,19 @@ public Instrumenter createConsumerReceiveInstrumenter KafkaReceiveAttributesGetter getter = new KafkaReceiveAttributesGetter(); MessageOperation operation = MessageOperation.RECEIVE; - return setMessagingReceiveExceptionEventExtractor( - Instrumenter.builder( - openTelemetry, - instrumentationName, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor( - buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) - .addAttributesExtractor(new KafkaReceiveAttributesExtractor()) - .addAttributesExtractors(extractors) - .setErrorCauseExtractor(errorCauseExtractor) - .setEnabled(messagingReceiveInstrumentationEnabled)) - .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + InstrumenterBuilder builder = + Instrumenter.builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) + .addAttributesExtractor(new KafkaReceiveAttributesExtractor()) + .addAttributesExtractors(extractors) + .setErrorCauseExtractor(errorCauseExtractor) + .setEnabled(messagingReceiveInstrumentationEnabled); + setMessagingReceiveExceptionEventExtractor(builder); + return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } public Instrumenter createConsumerProcessInstrumenter() { @@ -160,19 +161,20 @@ public Instrumenter createBatchProcessInstrumenter() KafkaReceiveAttributesGetter getter = new KafkaReceiveAttributesGetter(); MessageOperation operation = MessageOperation.PROCESS; - return setMessagingProcessExceptionEventExtractor( - Instrumenter.builder( - openTelemetry, - instrumentationName, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor( - buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) - .addAttributesExtractor(new KafkaReceiveAttributesExtractor()) - .addSpanLinksExtractor( - new KafkaBatchProcessSpanLinksExtractor( - openTelemetry.getPropagators().getTextMapPropagator())) - .setErrorCauseExtractor(errorCauseExtractor)) - .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + InstrumenterBuilder builder = + Instrumenter.builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) + .addAttributesExtractor(new KafkaReceiveAttributesExtractor()) + .addSpanLinksExtractor( + new KafkaBatchProcessSpanLinksExtractor( + openTelemetry.getPropagators().getTextMapPropagator())) + .setErrorCauseExtractor(errorCauseExtractor); + setMessagingProcessExceptionEventExtractor(builder); + return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } private static diff --git a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java index 82de93078984..21d1fa944a28 100644 --- a/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java +++ b/instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectSingletons.java @@ -13,6 +13,7 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; public class KafkaConnectSingletons { @@ -27,19 +28,20 @@ public class KafkaConnectSingletons { KafkaConnectBatchProcessSpanLinksExtractor spanLinksExtractor = new KafkaConnectBatchProcessSpanLinksExtractor(propagator); - instrumenter = - setMessagingProcessExceptionEventExtractor( - Instrumenter.builder( - GlobalOpenTelemetry.get(), - INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create( - new KafkaConnectAttributesGetter(), MessageOperation.PROCESS)) - .addAttributesExtractor( - MessagingAttributesExtractor.builder( - new KafkaConnectAttributesGetter(), MessageOperation.PROCESS) - .build()) - .addSpanLinksExtractor(spanLinksExtractor)) - .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + InstrumenterBuilder builder = + Instrumenter.builder( + GlobalOpenTelemetry.get(), + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create( + new KafkaConnectAttributesGetter(), MessageOperation.PROCESS)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder( + new KafkaConnectAttributesGetter(), MessageOperation.PROCESS) + .build()) + .addSpanLinksExtractor(spanLinksExtractor); + setMessagingProcessExceptionEventExtractor(builder); + + instrumenter = builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } public static Instrumenter instrumenter() { diff --git a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java index e6665c0ee081..3faa961994b5 100644 --- a/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java +++ b/instrumentation/nats/nats-2.17/library/src/main/java/io/opentelemetry/instrumentation/nats/v2_17/internal/NatsInstrumenterFactory.java @@ -25,18 +25,19 @@ public final class NatsInstrumenterFactory { public static Instrumenter createProducerInstrumenter( OpenTelemetry openTelemetry, List capturedHeaders) { - return setMessagingSendExceptionEventExtractor( - Instrumenter.builder( - openTelemetry, - INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create( - new NatsRequestMessagingAttributesGetter(), MessageOperation.PUBLISH)) - .addAttributesExtractor( - MessagingAttributesExtractor.builder( - new NatsRequestMessagingAttributesGetter(), MessageOperation.PUBLISH) - .setCapturedHeaders(capturedHeaders) - .build())) - .buildProducerInstrumenter(new NatsRequestTextMapSetter()); + InstrumenterBuilder builder = + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create( + new NatsRequestMessagingAttributesGetter(), MessageOperation.PUBLISH)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder( + new NatsRequestMessagingAttributesGetter(), MessageOperation.PUBLISH) + .setCapturedHeaders(capturedHeaders) + .build()); + setMessagingSendExceptionEventExtractor(builder); + return builder.buildProducerInstrumenter(new NatsRequestTextMapSetter()); } public static Instrumenter createConsumerProcessInstrumenter( diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 0416da653655..e3a9b50dc7c2 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -96,18 +96,19 @@ private static Instrumenter createConsumerBatchReceive MessagingAttributesGetter getter = new PulsarBatchMessagingAttributesGetter(); - return setMessagingReceiveExceptionEventExtractor( - Instrumenter.builder( - telemetry, - INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE)) - .addAttributesExtractor( - createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE)) - .addAttributesExtractor( - ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) - .addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(propagator)) - .addOperationMetrics(MessagingConsumerMetrics.get())) - .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + InstrumenterBuilder instrumenterBuilder = + Instrumenter.builder( + telemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE)) + .addAttributesExtractor( + createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE)) + .addAttributesExtractor( + ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) + .addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(propagator)) + .addOperationMetrics(MessagingConsumerMetrics.get()); + setMessagingReceiveExceptionEventExtractor(instrumenterBuilder); + return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } private static Instrumenter createConsumerProcessInstrumenter() { diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/v2_7/RabbitSingletons.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/v2_7/RabbitSingletons.java index f87221a5b2be..7664c4a358f4 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/v2_7/RabbitSingletons.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/v2_7/RabbitSingletons.java @@ -86,16 +86,17 @@ private static Instrumenter createReceiveInstrument extractors.add(new RabbitReceiveExperimentalAttributesExtractor()); } - return setMessagingReceiveExceptionEventExtractor( - Instrumenter.builder( - GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, ReceiveRequest::spanName) - .addAttributesExtractors(extractors) - .setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) - .addSpanLinksExtractor( - new PropagatorBasedSpanLinksExtractor<>( - GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), - new ReceiveRequestTextMapGetter()))) - .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + InstrumenterBuilder builder = + Instrumenter.builder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, ReceiveRequest::spanName) + .addAttributesExtractors(extractors) + .setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) + .addSpanLinksExtractor( + new PropagatorBasedSpanLinksExtractor<>( + GlobalOpenTelemetry.getPropagators().getTextMapPropagator(), + new ReceiveRequestTextMapGetter())); + setMessagingReceiveExceptionEventExtractor(builder); + return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } private static Instrumenter createDeliverInstrumenter() { @@ -109,11 +110,12 @@ private static Instrumenter createDeliverInstrumenter() { extractors.add(new RabbitDeliveryExperimentalAttributesExtractor()); } - return setMessagingProcessExceptionEventExtractor( - Instrumenter.builder( - GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, DeliveryRequest::spanName) - .addAttributesExtractors(extractors)) - .buildConsumerInstrumenter(new DeliveryRequestGetter()); + InstrumenterBuilder builder = + Instrumenter.builder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, DeliveryRequest::spanName) + .addAttributesExtractors(extractors); + setMessagingProcessExceptionEventExtractor(builder); + return builder.buildConsumerInstrumenter(new DeliveryRequestGetter()); } private static AttributesExtractor buildMessagingAttributesExtractor( diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryBuilder.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryBuilder.java index 31dcf102aa56..20dea3ca31f7 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryBuilder.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryBuilder.java @@ -17,6 +17,7 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import java.util.ArrayList; import java.util.Collection; @@ -74,33 +75,35 @@ public SpringIntegrationTelemetryBuilder setProducerSpanEnabled(boolean producer * SpringIntegrationTelemetryBuilder}. */ public SpringIntegrationTelemetry build() { + InstrumenterBuilder consumerBuilder = + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + SpringIntegrationTelemetryBuilder::consumerSpanName) + .addAttributesExtractors(additionalAttributeExtractors) + .addAttributesExtractor( + buildMessagingAttributesExtractor( + new SpringMessagingAttributesGetter(), + MessageOperation.PROCESS, + capturedHeaders)); + setMessagingProcessExceptionEventExtractor(consumerBuilder); Instrumenter consumerInstrumenter = - setMessagingProcessExceptionEventExtractor( - Instrumenter.builder( - openTelemetry, - INSTRUMENTATION_NAME, - SpringIntegrationTelemetryBuilder::consumerSpanName) - .addAttributesExtractors(additionalAttributeExtractors) - .addAttributesExtractor( - buildMessagingAttributesExtractor( - new SpringMessagingAttributesGetter(), - MessageOperation.PROCESS, - capturedHeaders))) - .buildConsumerInstrumenter(MessageHeadersGetter.INSTANCE); + consumerBuilder.buildConsumerInstrumenter(MessageHeadersGetter.INSTANCE); + InstrumenterBuilder producerBuilder = + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + SpringIntegrationTelemetryBuilder::producerSpanName) + .addAttributesExtractors(additionalAttributeExtractors) + .addAttributesExtractor( + buildMessagingAttributesExtractor( + new SpringMessagingAttributesGetter(), + MessageOperation.PUBLISH, + capturedHeaders)); + setMessagingSendExceptionEventExtractor(producerBuilder); Instrumenter producerInstrumenter = - setMessagingSendExceptionEventExtractor( - Instrumenter.builder( - openTelemetry, - INSTRUMENTATION_NAME, - SpringIntegrationTelemetryBuilder::producerSpanName) - .addAttributesExtractors(additionalAttributeExtractors) - .addAttributesExtractor( - buildMessagingAttributesExtractor( - new SpringMessagingAttributesGetter(), - MessageOperation.PUBLISH, - capturedHeaders))) - .buildInstrumenter(SpanKindExtractor.alwaysProducer()); + producerBuilder.buildInstrumenter(SpanKindExtractor.alwaysProducer()); return new SpringIntegrationTelemetry( openTelemetry.getPropagators(), consumerInstrumenter, diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/SpringRabbitSingletons.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/SpringRabbitSingletons.java index 81b20ead0cf2..0b1c73f8a742 100644 --- a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/SpringRabbitSingletons.java +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/v1_0/SpringRabbitSingletons.java @@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import org.springframework.amqp.core.Message; @@ -25,17 +26,18 @@ public class SpringRabbitSingletons { SpringRabbitMessageAttributesGetter getter = new SpringRabbitMessageAttributesGetter(); MessageOperation operation = MessageOperation.PROCESS; - instrumenter = - setMessagingProcessExceptionEventExtractor( - Instrumenter.builder( - GlobalOpenTelemetry.get(), - INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor( - MessagingAttributesExtractor.builder(getter, operation) - .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) - .build())) - .buildConsumerInstrumenter(new MessageHeaderGetter()); + InstrumenterBuilder builder = + Instrumenter.builder( + GlobalOpenTelemetry.get(), + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .build()); + setMessagingProcessExceptionEventExtractor(builder); + + instrumenter = builder.buildConsumerInstrumenter(new MessageHeaderGetter()); } public static Instrumenter instrumenter() { From e5b30b89e10bf77e46838950636be51f755bd6f0 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Fri, 5 Jun 2026 12:02:11 -0700 Subject: [PATCH 3/5] Improve Kafka receive exception log test --- .../kafkaclients/v2_6/WrapperTest.java | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java index 01cc602b3082..c2eb873dfbf4 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java @@ -12,6 +12,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_MESSAGE; import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_STACKTRACE; import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_TYPE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; @@ -178,10 +179,10 @@ void testConsumerError() { KafkaTelemetry telemetry = telemetryBuilder.build(); Consumer mockConsumer = mock(); - when(mockConsumer.poll(Duration.ofSeconds(10))).thenThrow(new IllegalStateException()); + IllegalStateException error = new IllegalStateException("test"); + when(mockConsumer.poll(Duration.ofSeconds(10))).thenThrow(error); Consumer wrappedConsumer = telemetry.wrap(mockConsumer); - assertThatThrownBy(() -> wrappedConsumer.poll(Duration.ofSeconds(10))) - .isInstanceOf(IllegalStateException.class); + assertThatThrownBy(() -> wrappedConsumer.poll(Duration.ofSeconds(10))).isSameAs(error); testing.waitAndAssertTraces( trace -> @@ -196,30 +197,31 @@ void testConsumerError() { equalTo(MESSAGING_OPERATION, "receive"), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0)); if (emitExceptionAsSpanEvents()) { - span.hasException(new IllegalStateException()); + span.hasException(error); } })); if (emitExceptionAsLogs()) { - assertReceiveExceptionLog(); + Awaitility.await() + .untilAsserted( + () -> { + List logs = + testing.logRecords().stream() + .filter(log -> "messaging.receive.exception".equals(log.getEventName())) + .collect(toList()); + assertThat(logs).hasSize(1); + assertThat(logs.get(0)) + .hasSeverity(Severity.WARN) + .hasEventName("messaging.receive.exception") + .hasAttributesSatisfyingExactly( + equalTo(EXCEPTION_TYPE, error.getClass().getName()), + equalTo(EXCEPTION_MESSAGE, error.getMessage()), + satisfies( + EXCEPTION_STACKTRACE, + val -> + val.contains(error.getClass().getName()) + .contains("WrapperTest.testConsumerError"))); + }); } } - - private static void assertReceiveExceptionLog() { - Awaitility.await() - .untilAsserted( - () -> { - List logs = - testing.logRecords().stream() - .filter(log -> "messaging.receive.exception".equals(log.getEventName())) - .collect(toList()); - assertThat(logs).hasSize(1); - assertThat(logs.get(0)) - .hasSeverity(Severity.WARN) - .hasEventName("messaging.receive.exception") - .hasAttributesSatisfyingExactly( - satisfies(EXCEPTION_TYPE, val -> val.isNotNull()), - satisfies(EXCEPTION_STACKTRACE, val -> val.isNotNull())); - }); - } } From 67d5bbb544ff2efd99a29ddee127c16e68e76189 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Fri, 5 Jun 2026 13:32:58 -0700 Subject: [PATCH 4/5] Simplify Kafka exception log assertion --- .../kafkaclients/v2_6/WrapperTest.java | 56 ++++++------------- 1 file changed, 17 insertions(+), 39 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java index c2eb873dfbf4..c9f784651554 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/WrapperTest.java @@ -12,9 +12,6 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; -import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_MESSAGE; -import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_STACKTRACE; -import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_TYPE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; @@ -26,7 +23,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; -import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; @@ -36,7 +32,6 @@ import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.message.MessageHeaderUtil; -import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.StatusData; @@ -47,7 +42,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.assertj.core.api.AbstractLongAssert; import org.assertj.core.api.AbstractStringAssert; -import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; @SuppressWarnings("deprecation") // using deprecated semconv @@ -187,41 +181,25 @@ void testConsumerError() { testing.waitAndAssertTraces( trace -> trace.hasSpansSatisfyingExactly( - span -> { - span.hasName("unknown receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasStatus(StatusData.error()) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_SYSTEM, "kafka"), - equalTo(MESSAGING_OPERATION, "receive"), - equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0)); - if (emitExceptionAsSpanEvents()) { - span.hasException(error); - } - })); + span -> + span.hasName("unknown receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(emitExceptionAsSpanEvents() ? error : null) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_SYSTEM, "kafka"), + equalTo(MESSAGING_OPERATION, "receive"), + equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0)))); if (emitExceptionAsLogs()) { - Awaitility.await() - .untilAsserted( - () -> { - List logs = - testing.logRecords().stream() - .filter(log -> "messaging.receive.exception".equals(log.getEventName())) - .collect(toList()); - assertThat(logs).hasSize(1); - assertThat(logs.get(0)) - .hasSeverity(Severity.WARN) - .hasEventName("messaging.receive.exception") - .hasAttributesSatisfyingExactly( - equalTo(EXCEPTION_TYPE, error.getClass().getName()), - equalTo(EXCEPTION_MESSAGE, error.getMessage()), - satisfies( - EXCEPTION_STACKTRACE, - val -> - val.contains(error.getClass().getName()) - .contains("WrapperTest.testConsumerError"))); - }); + testing.waitAndAssertLogRecords( + logRecord -> + logRecord + .hasSeverity(Severity.WARN) + .hasEventName("messaging.receive.exception") + .hasException(error) + .hasTotalAttributeCount(3)); } } } From 07f3839aaef6ed07e1c721f0c71bd08d573c9262 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Sat, 6 Jun 2026 16:37:44 -0700 Subject: [PATCH 5/5] Relax messaging helper builder types --- .../MessagingExceptionEventExtractors.java | 48 +++++++------------ 1 file changed, 17 insertions(+), 31 deletions(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractors.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractors.java index ab8885c5a1f7..75e96c7ac3df 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractors.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/internal/MessagingExceptionEventExtractors.java @@ -5,7 +5,6 @@ package io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal; -import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.logs.Severity; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.internal.Experimental; @@ -21,11 +20,9 @@ public final class MessagingExceptionEventExtractors { * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} * flag. */ - @CanIgnoreReturnValue - public static - InstrumenterBuilder setMessagingCreateExceptionEventExtractor( - InstrumenterBuilder builder) { - return setExceptionEventExtractor(builder, "messaging.create.exception", Severity.WARN); + public static void setMessagingCreateExceptionEventExtractor( + InstrumenterBuilder builder) { + setExceptionEventExtractor(builder, "messaging.create.exception", Severity.WARN); } /** @@ -33,11 +30,9 @@ InstrumenterBuilder setMessagingCreateExceptionEventExtractor * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} * flag. */ - @CanIgnoreReturnValue - public static - InstrumenterBuilder setMessagingSendExceptionEventExtractor( - InstrumenterBuilder builder) { - return setExceptionEventExtractor(builder, "messaging.send.exception", Severity.WARN); + public static void setMessagingSendExceptionEventExtractor( + InstrumenterBuilder builder) { + setExceptionEventExtractor(builder, "messaging.send.exception", Severity.WARN); } /** @@ -45,11 +40,9 @@ InstrumenterBuilder setMessagingSendExceptionEventExtractor( * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} * flag. */ - @CanIgnoreReturnValue - public static - InstrumenterBuilder setMessagingReceiveExceptionEventExtractor( - InstrumenterBuilder builder) { - return setExceptionEventExtractor(builder, "messaging.receive.exception", Severity.WARN); + public static void setMessagingReceiveExceptionEventExtractor( + InstrumenterBuilder builder) { + setExceptionEventExtractor(builder, "messaging.receive.exception", Severity.WARN); } /** @@ -57,11 +50,9 @@ InstrumenterBuilder setMessagingReceiveExceptionEventExtracto * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} * flag. */ - @CanIgnoreReturnValue - public static - InstrumenterBuilder setMessagingSettleExceptionEventExtractor( - InstrumenterBuilder builder) { - return setExceptionEventExtractor(builder, "messaging.settle.exception", Severity.WARN); + public static void setMessagingSettleExceptionEventExtractor( + InstrumenterBuilder builder) { + setExceptionEventExtractor(builder, "messaging.settle.exception", Severity.WARN); } /** @@ -69,24 +60,19 @@ InstrumenterBuilder setMessagingSettleExceptionEventExtractor * emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview} * flag. */ - @CanIgnoreReturnValue - public static - InstrumenterBuilder setMessagingProcessExceptionEventExtractor( - InstrumenterBuilder builder) { - return setExceptionEventExtractor(builder, "messaging.process.exception", Severity.ERROR); + public static void setMessagingProcessExceptionEventExtractor( + InstrumenterBuilder builder) { + setExceptionEventExtractor(builder, "messaging.process.exception", Severity.ERROR); } - @CanIgnoreReturnValue - private static - InstrumenterBuilder setExceptionEventExtractor( - InstrumenterBuilder builder, String eventName, Severity severity) { + private static void setExceptionEventExtractor( + InstrumenterBuilder builder, String eventName, Severity severity) { Experimental.setExceptionEventExtractor( builder, (logRecordBuilder, context, request) -> { logRecordBuilder.setEventName(eventName); logRecordBuilder.setSeverity(severity); }); - return builder; } private MessagingExceptionEventExtractors() {}