|
12 | 12 | import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; |
13 | 13 | import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; |
14 | 14 | import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; |
15 | | -import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_MESSAGE; |
16 | | -import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_STACKTRACE; |
17 | | -import static io.opentelemetry.semconv.ExceptionAttributes.EXCEPTION_TYPE; |
18 | 15 | import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; |
19 | 16 | import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; |
20 | 17 | import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; |
|
26 | 23 | import static java.nio.charset.StandardCharsets.UTF_8; |
27 | 24 | import static java.util.Arrays.asList; |
28 | 25 | import static java.util.Collections.singletonList; |
29 | | -import static java.util.stream.Collectors.toList; |
30 | 26 | import static org.assertj.core.api.Assertions.assertThat; |
31 | 27 | import static org.assertj.core.api.Assertions.assertThatThrownBy; |
32 | 28 | import static org.mockito.Mockito.mock; |
|
36 | 32 | import io.opentelemetry.api.trace.SpanContext; |
37 | 33 | import io.opentelemetry.api.trace.SpanKind; |
38 | 34 | import io.opentelemetry.instrumentation.testing.junit.message.MessageHeaderUtil; |
39 | | -import io.opentelemetry.sdk.logs.data.LogRecordData; |
40 | 35 | import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; |
41 | 36 | import io.opentelemetry.sdk.trace.data.LinkData; |
42 | 37 | import io.opentelemetry.sdk.trace.data.StatusData; |
|
47 | 42 | import org.apache.kafka.clients.consumer.Consumer; |
48 | 43 | import org.assertj.core.api.AbstractLongAssert; |
49 | 44 | import org.assertj.core.api.AbstractStringAssert; |
50 | | -import org.awaitility.Awaitility; |
51 | 45 | import org.junit.jupiter.api.Test; |
52 | 46 |
|
53 | 47 | @SuppressWarnings("deprecation") // using deprecated semconv |
@@ -187,41 +181,25 @@ void testConsumerError() { |
187 | 181 | testing.waitAndAssertTraces( |
188 | 182 | trace -> |
189 | 183 | trace.hasSpansSatisfyingExactly( |
190 | | - span -> { |
191 | | - span.hasName("unknown receive") |
192 | | - .hasKind(SpanKind.CONSUMER) |
193 | | - .hasNoParent() |
194 | | - .hasStatus(StatusData.error()) |
195 | | - .hasAttributesSatisfyingExactly( |
196 | | - equalTo(MESSAGING_SYSTEM, "kafka"), |
197 | | - equalTo(MESSAGING_OPERATION, "receive"), |
198 | | - equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0)); |
199 | | - if (emitExceptionAsSpanEvents()) { |
200 | | - span.hasException(error); |
201 | | - } |
202 | | - })); |
| 184 | + span -> |
| 185 | + span.hasName("unknown receive") |
| 186 | + .hasKind(SpanKind.CONSUMER) |
| 187 | + .hasNoParent() |
| 188 | + .hasStatus(StatusData.error()) |
| 189 | + .hasException(emitExceptionAsSpanEvents() ? error : null) |
| 190 | + .hasAttributesSatisfyingExactly( |
| 191 | + equalTo(MESSAGING_SYSTEM, "kafka"), |
| 192 | + equalTo(MESSAGING_OPERATION, "receive"), |
| 193 | + equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 0)))); |
203 | 194 |
|
204 | 195 | if (emitExceptionAsLogs()) { |
205 | | - Awaitility.await() |
206 | | - .untilAsserted( |
207 | | - () -> { |
208 | | - List<LogRecordData> logs = |
209 | | - testing.logRecords().stream() |
210 | | - .filter(log -> "messaging.receive.exception".equals(log.getEventName())) |
211 | | - .collect(toList()); |
212 | | - assertThat(logs).hasSize(1); |
213 | | - assertThat(logs.get(0)) |
214 | | - .hasSeverity(Severity.WARN) |
215 | | - .hasEventName("messaging.receive.exception") |
216 | | - .hasAttributesSatisfyingExactly( |
217 | | - equalTo(EXCEPTION_TYPE, error.getClass().getName()), |
218 | | - equalTo(EXCEPTION_MESSAGE, error.getMessage()), |
219 | | - satisfies( |
220 | | - EXCEPTION_STACKTRACE, |
221 | | - val -> |
222 | | - val.contains(error.getClass().getName()) |
223 | | - .contains("WrapperTest.testConsumerError"))); |
224 | | - }); |
| 196 | + testing.waitAndAssertLogRecords( |
| 197 | + logRecord -> |
| 198 | + logRecord |
| 199 | + .hasSeverity(Severity.WARN) |
| 200 | + .hasEventName("messaging.receive.exception") |
| 201 | + .hasException(error) |
| 202 | + .hasTotalAttributeCount(3)); |
225 | 203 | } |
226 | 204 | } |
227 | 205 | } |
0 commit comments