|
7 | 7 |
|
8 | 8 | import static io.opentelemetry.api.common.AttributeKey.longKey; |
9 | 9 | import static io.opentelemetry.api.common.AttributeKey.stringKey; |
| 10 | +import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsLogs; |
| 11 | +import static io.opentelemetry.instrumentation.api.internal.SemconvExceptionSignal.emitExceptionAsSpanEvents; |
| 12 | +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; |
10 | 13 | import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; |
11 | 14 | import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; |
| 15 | +import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_STACKTRACE; |
| 16 | +import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_TYPE; |
12 | 17 | import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; |
13 | 18 | import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; |
14 | 19 | import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; |
|
20 | 25 | import static java.nio.charset.StandardCharsets.UTF_8; |
21 | 26 | import static java.util.Arrays.asList; |
22 | 27 | import static java.util.Collections.singletonList; |
| 28 | +import static java.util.stream.Collectors.toList; |
23 | 29 | import static org.assertj.core.api.Assertions.assertThat; |
24 | 30 | import static org.assertj.core.api.Assertions.assertThatThrownBy; |
25 | 31 | import static org.mockito.Mockito.mock; |
26 | 32 | import static org.mockito.Mockito.when; |
27 | 33 |
|
| 34 | +import io.opentelemetry.api.logs.Severity; |
28 | 35 | import io.opentelemetry.api.trace.SpanContext; |
29 | 36 | import io.opentelemetry.api.trace.SpanKind; |
30 | 37 | import io.opentelemetry.instrumentation.testing.junit.message.MessageHeaderUtil; |
| 38 | +import io.opentelemetry.sdk.logs.data.LogRecordData; |
31 | 39 | import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; |
32 | 40 | import io.opentelemetry.sdk.trace.data.LinkData; |
33 | 41 | import io.opentelemetry.sdk.trace.data.StatusData; |
|
38 | 46 | import org.apache.kafka.clients.consumer.Consumer; |
39 | 47 | import org.assertj.core.api.AbstractLongAssert; |
40 | 48 | import org.assertj.core.api.AbstractStringAssert; |
| 49 | +import org.awaitility.Awaitility; |
41 | 50 | import org.junit.jupiter.api.Test; |
42 | 51 |
|
43 | 52 | @SuppressWarnings("deprecation") // using deprecated semconv |
@@ -177,15 +186,40 @@ void testConsumerError() { |
177 | 186 | testing.waitAndAssertTraces( |
178 | 187 | trace -> |
179 | 188 | trace.hasSpansSatisfyingExactly( |
180 | | - span -> |
181 | | - span.hasName("unknown receive") |
182 | | - .hasKind(SpanKind.CONSUMER) |
183 | | - .hasNoParent() |
184 | | - .hasStatus(StatusData.error()) |
185 | | - .hasException(new IllegalStateException()) |
186 | | - .hasAttributesSatisfyingExactly( |
187 | | - equalTo(MESSAGING_SYSTEM, "kafka"), |
188 | | - equalTo(MESSAGING_OPERATION, "receive"), |
189 | | - equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0)))); |
| 189 | + span -> { |
| 190 | + span.hasName("unknown receive") |
| 191 | + .hasKind(SpanKind.CONSUMER) |
| 192 | + .hasNoParent() |
| 193 | + .hasStatus(StatusData.error()) |
| 194 | + .hasAttributesSatisfyingExactly( |
| 195 | + equalTo(MESSAGING_SYSTEM, "kafka"), |
| 196 | + equalTo(MESSAGING_OPERATION, "receive"), |
| 197 | + equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0)); |
| 198 | + if (emitExceptionAsSpanEvents()) { |
| 199 | + span.hasException(new IllegalStateException()); |
| 200 | + } |
| 201 | + })); |
| 202 | + |
| 203 | + if (emitExceptionAsLogs()) { |
| 204 | + assertReceiveExceptionLog(); |
| 205 | + } |
| 206 | + } |
| 207 | + |
| 208 | + private static void assertReceiveExceptionLog() { |
| 209 | + Awaitility.await() |
| 210 | + .untilAsserted( |
| 211 | + () -> { |
| 212 | + List<LogRecordData> logs = |
| 213 | + testing.logRecords().stream() |
| 214 | + .filter(log -> "messaging.receive.exception".equals(log.getEventName())) |
| 215 | + .collect(toList()); |
| 216 | + assertThat(logs).hasSize(1); |
| 217 | + assertThat(logs.get(0)) |
| 218 | + .hasSeverity(Severity.WARN) |
| 219 | + .hasEventName("messaging.receive.exception") |
| 220 | + .hasAttributesSatisfyingExactly( |
| 221 | + satisfies(EXCEPTION_TYPE, val -> val.isNotNull()), |
| 222 | + satisfies(EXCEPTION_STACKTRACE, val -> val.isNotNull())); |
| 223 | + }); |
190 | 224 | } |
191 | 225 | } |
0 commit comments