Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal;

import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.internal.Experimental;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class MessagingExceptionEventExtractors {

/**
* Configures the messaging create exception event name and severity. Only takes effect when
* emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview}
* flag.
*/
public static <REQUEST> void setMessagingCreateExceptionEventExtractor(
InstrumenterBuilder<REQUEST, ?> builder) {
setExceptionEventExtractor(builder, "messaging.create.exception", Severity.WARN);
}

/**
* Configures the messaging send exception event name and severity. Only takes effect when
* emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview}
* flag.
*/
public static <REQUEST> void setMessagingSendExceptionEventExtractor(
InstrumenterBuilder<REQUEST, ?> builder) {
setExceptionEventExtractor(builder, "messaging.send.exception", Severity.WARN);
}

/**
* Configures the messaging receive exception event name and severity. Only takes effect when
* emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview}
* flag.
*/
public static <REQUEST> void setMessagingReceiveExceptionEventExtractor(
InstrumenterBuilder<REQUEST, ?> builder) {
setExceptionEventExtractor(builder, "messaging.receive.exception", Severity.WARN);
}

/**
* Configures the messaging settle exception event name and severity. Only takes effect when
* emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview}
* flag.
*/
public static <REQUEST> void setMessagingSettleExceptionEventExtractor(
InstrumenterBuilder<REQUEST, ?> builder) {
setExceptionEventExtractor(builder, "messaging.settle.exception", Severity.WARN);
}

/**
* Configures the messaging process exception event name and severity. Only takes effect when
* emitting exceptions as logs is enabled via the {@code otel.semconv.exception.signal.preview}
* flag.
*/
public static <REQUEST> void setMessagingProcessExceptionEventExtractor(
InstrumenterBuilder<REQUEST, ?> builder) {
setExceptionEventExtractor(builder, "messaging.process.exception", Severity.ERROR);
}

private static <REQUEST> void setExceptionEventExtractor(
InstrumenterBuilder<REQUEST, ?> builder, String eventName, Severity severity) {
Experimental.setExceptionEventExtractor(
builder,
(logRecordBuilder, context, request) -> {
logRecordBuilder.setEventName(eventName);
logRecordBuilder.setSeverity(severity);
});
}

private MessagingExceptionEventExtractors() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal;

import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsLogs;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_MESSAGE;
import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_STACKTRACE;
import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_TYPE;

import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
import java.util.List;
import java.util.function.Consumer;
import org.assertj.core.api.AbstractAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class MessagingExceptionEventExtractorsTest {

@RegisterExtension
static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create();

@Test
void messagingCreateExceptionLog() {
assertExceptionLog(
MessagingExceptionEventExtractors::setMessagingCreateExceptionEventExtractor,
"messaging.create.exception",
Severity.WARN);
}

@Test
void messagingSendExceptionLog() {
assertExceptionLog(
MessagingExceptionEventExtractors::setMessagingSendExceptionEventExtractor,
"messaging.send.exception",
Severity.WARN);
}

@Test
void messagingReceiveExceptionLog() {
assertExceptionLog(
MessagingExceptionEventExtractors::setMessagingReceiveExceptionEventExtractor,
"messaging.receive.exception",
Severity.WARN);
}

@Test
void messagingSettleExceptionLog() {
assertExceptionLog(
MessagingExceptionEventExtractors::setMessagingSettleExceptionEventExtractor,
"messaging.settle.exception",
Severity.WARN);
}

@Test
void messagingProcessExceptionLog() {
assertExceptionLog(
MessagingExceptionEventExtractors::setMessagingProcessExceptionEventExtractor,
"messaging.process.exception",
Severity.ERROR);
}

private static void assertExceptionLog(
Consumer<InstrumenterBuilder<String, String>> configure,
String expectedEventName,
Severity expectedSeverity) {
InstrumenterBuilder<String, String> builder =
Instrumenter.builder(otelTesting.getOpenTelemetry(), "test", unused -> "span");
configure.accept(builder);
Instrumenter<String, String> instrumenter = builder.buildInstrumenter();

Context context = instrumenter.start(Context.root(), "request");
IllegalStateException error = new IllegalStateException("test");
instrumenter.end(context, "request", "response", error);

List<LogRecordData> logs = otelTesting.getLogRecords();
if (emitExceptionAsLogs()) {
assertThat(logs).hasSize(1);
assertThat(logs.get(0))
.hasSeverity(expectedSeverity)
.hasEventName(expectedEventName)
.hasAttributesSatisfyingExactly(
equalTo(EXCEPTION_TYPE, "java.lang.IllegalStateException"),
equalTo(EXCEPTION_MESSAGE, "test"),
satisfies(EXCEPTION_STACKTRACE, AbstractAssert::isNotNull));
} else {
assertThat(logs).isEmpty();
}
}
Comment thread
trask marked this conversation as resolved.
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package io.opentelemetry.javaagent.instrumentation.jms.common.v1_1;

import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor;
import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingReceiveExceptionEventExtractor;
import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor;
import static java.util.Collections.emptyList;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
Expand Down Expand Up @@ -50,12 +53,14 @@ public Instrumenter<MessageWithDestination, Void> createProducerInstrumenter() {
JmsMessageAttributesGetter getter = new JmsMessageAttributesGetter();
MessageOperation operation = MessageOperation.PUBLISH;

return Instrumenter.<MessageWithDestination, Void>builder(
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(createMessagingAttributesExtractor(operation))
.buildProducerInstrumenter(new MessagePropertySetter());
InstrumenterBuilder<MessageWithDestination, Void> builder =
Instrumenter.<MessageWithDestination, Void>builder(
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(createMessagingAttributesExtractor(operation));
setMessagingSendExceptionEventExtractor(builder);
return builder.buildProducerInstrumenter(new MessagePropertySetter());
}

public Instrumenter<MessageWithDestination, Void> createConsumerReceiveInstrumenter() {
Expand All @@ -68,6 +73,7 @@ public Instrumenter<MessageWithDestination, Void> createConsumerReceiveInstrumen
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(createMessagingAttributesExtractor(operation));
setMessagingReceiveExceptionEventExtractor(builder);
if (messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<>(
Expand All @@ -88,6 +94,7 @@ public Instrumenter<MessageWithDestination, Void> createConsumerProcessInstrumen
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(createMessagingAttributesExtractor(operation));
setMessagingProcessExceptionEventExtractor(builder);
if (canHaveReceiveInstrumentation && messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,26 @@ dependencies {
}

tasks {
test {
withType<Test>().configureEach {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
systemProperty("testLatestDeps", otelProps.testLatestDeps)
systemProperty("collectMetadata", otelProps.collectMetadata)
}

val testExceptionSignalLogs by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath

filter {
includeTestsMatching("WrapperTest")
}
jvmArgs("-Dotel.semconv.exception.signal.preview=logs")
systemProperty("metadataConfig", "otel.semconv.exception.signal.preview=logs")
}
Comment thread
trask marked this conversation as resolved.

check {
dependsOn(testExceptionSignalLogs)
}
}

// kafka 4.1 requires java 11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsLogs;
import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsSpanEvents;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT;
Expand All @@ -25,6 +28,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.message.MessageHeaderUtil;
Expand Down Expand Up @@ -169,10 +173,10 @@ void testConsumerError() {
KafkaTelemetry telemetry = telemetryBuilder.build();

Consumer<?, ?> mockConsumer = mock();
when(mockConsumer.poll(Duration.ofSeconds(10))).thenThrow(new IllegalStateException());
IllegalStateException error = new IllegalStateException("test");
when(mockConsumer.poll(Duration.ofSeconds(10))).thenThrow(error);
Consumer<?, ?> wrappedConsumer = telemetry.wrap(mockConsumer);
assertThatThrownBy(() -> wrappedConsumer.poll(Duration.ofSeconds(10)))
.isInstanceOf(IllegalStateException.class);
assertThatThrownBy(() -> wrappedConsumer.poll(Duration.ofSeconds(10))).isSameAs(error);

testing.waitAndAssertTraces(
trace ->
Expand All @@ -182,10 +186,20 @@ void testConsumerError() {
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(new IllegalStateException())
.hasException(emitExceptionAsSpanEvents() ? error : null)
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_OPERATION, "receive"),
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0))));

if (emitExceptionAsLogs()) {
testing.waitAndAssertLogRecords(
logRecord ->
logRecord
.hasSeverity(Severity.WARN)
.hasEventName("messaging.receive.exception")
.hasException(error)
.hasTotalAttributeCount(3));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;

import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingProcessExceptionEventExtractor;
import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingReceiveExceptionEventExtractor;
import static io.opentelemetry.instrumentation.api.incubator.semconv.messaging.internal.MessagingExceptionEventExtractors.setMessagingSendExceptionEventExtractor;
import static java.util.Collections.emptyList;

import com.google.errorprone.annotations.CanIgnoreReturnValue;
Expand Down Expand Up @@ -91,6 +94,7 @@ public Instrumenter<KafkaProducerRequest, RecordMetadata> createProducerInstrume
if (captureExperimentalSpanAttributes) {
builder.addAttributesExtractor(new KafkaProducerExperimentalAttributesExtractor());
}
setMessagingSendExceptionEventExtractor(builder);
return builder.buildInstrumenter(SpanKindExtractor.alwaysProducer());
}

Expand All @@ -103,17 +107,19 @@ public Instrumenter<KafkaReceiveRequest, Void> createConsumerReceiveInstrumenter
KafkaReceiveAttributesGetter getter = new KafkaReceiveAttributesGetter();
MessageOperation operation = MessageOperation.RECEIVE;

return Instrumenter.<KafkaReceiveRequest, Void>builder(
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders))
.addAttributesExtractor(new KafkaReceiveAttributesExtractor())
.addAttributesExtractors(extractors)
.setErrorCauseExtractor(errorCauseExtractor)
.setEnabled(messagingReceiveInstrumentationEnabled)
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
InstrumenterBuilder<KafkaReceiveRequest, Void> builder =
Instrumenter.<KafkaReceiveRequest, Void>builder(
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders))
.addAttributesExtractor(new KafkaReceiveAttributesExtractor())
.addAttributesExtractors(extractors)
.setErrorCauseExtractor(errorCauseExtractor)
.setEnabled(messagingReceiveInstrumentationEnabled);
setMessagingReceiveExceptionEventExtractor(builder);
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

public Instrumenter<KafkaProcessRequest, Void> createConsumerProcessInstrumenter() {
Expand All @@ -138,6 +144,7 @@ public Instrumenter<KafkaProcessRequest, Void> createConsumerProcessInstrumenter
if (captureExperimentalSpanAttributes) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}
setMessagingProcessExceptionEventExtractor(builder);

if (messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor(
Expand All @@ -154,18 +161,20 @@ public Instrumenter<KafkaReceiveRequest, Void> createBatchProcessInstrumenter()
KafkaReceiveAttributesGetter getter = new KafkaReceiveAttributesGetter();
MessageOperation operation = MessageOperation.PROCESS;

return Instrumenter.<KafkaReceiveRequest, Void>builder(
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders))
.addAttributesExtractor(new KafkaReceiveAttributesExtractor())
.addSpanLinksExtractor(
new KafkaBatchProcessSpanLinksExtractor(
openTelemetry.getPropagators().getTextMapPropagator()))
.setErrorCauseExtractor(errorCauseExtractor)
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
InstrumenterBuilder<KafkaReceiveRequest, Void> builder =
Instrumenter.<KafkaReceiveRequest, Void>builder(
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(
buildMessagingAttributesExtractor(getter, operation, capturedHeaders))
.addAttributesExtractor(new KafkaReceiveAttributesExtractor())
.addSpanLinksExtractor(
new KafkaBatchProcessSpanLinksExtractor(
openTelemetry.getPropagators().getTextMapPropagator()))
.setErrorCauseExtractor(errorCauseExtractor);
setMessagingProcessExceptionEventExtractor(builder);
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static <REQUEST, RESPONSE>
Expand Down
Loading
Loading