1212import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .assertThat ;
1313import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .equalTo ;
1414import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .satisfies ;
15+ import static io .opentelemetry .semconv .ExceptionAttributes .EXCEPTION_MESSAGE ;
1516import static io .opentelemetry .semconv .ExceptionAttributes .EXCEPTION_STACKTRACE ;
1617import static io .opentelemetry .semconv .ExceptionAttributes .EXCEPTION_TYPE ;
1718import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_BATCH_MESSAGE_COUNT ;
@@ -178,10 +179,10 @@ void testConsumerError() {
178179 KafkaTelemetry telemetry = telemetryBuilder .build ();
179180
180181 Consumer <?, ?> mockConsumer = mock ();
181- when (mockConsumer .poll (Duration .ofSeconds (10 ))).thenThrow (new IllegalStateException ());
182+ IllegalStateException error = new IllegalStateException ("test" );
183+ when (mockConsumer .poll (Duration .ofSeconds (10 ))).thenThrow (error );
182184 Consumer <?, ?> wrappedConsumer = telemetry .wrap (mockConsumer );
183- assertThatThrownBy (() -> wrappedConsumer .poll (Duration .ofSeconds (10 )))
184- .isInstanceOf (IllegalStateException .class );
185+ assertThatThrownBy (() -> wrappedConsumer .poll (Duration .ofSeconds (10 ))).isSameAs (error );
185186
186187 testing .waitAndAssertTraces (
187188 trace ->
@@ -196,30 +197,31 @@ void testConsumerError() {
196197 equalTo (MESSAGING_OPERATION , "receive" ),
197198 equalTo (MESSAGING_BATCH_MESSAGE_COUNT , 0 ));
198199 if (emitExceptionAsSpanEvents ()) {
199- span .hasException (new IllegalStateException () );
200+ span .hasException (error );
200201 }
201202 }));
202203
203204 if (emitExceptionAsLogs ()) {
204- assertReceiveExceptionLog ();
205+ Awaitility .await ()
206+ .untilAsserted (
207+ () -> {
208+ List <LogRecordData > logs =
209+ testing .logRecords ().stream ()
210+ .filter (log -> "messaging.receive.exception" .equals (log .getEventName ()))
211+ .collect (toList ());
212+ assertThat (logs ).hasSize (1 );
213+ assertThat (logs .get (0 ))
214+ .hasSeverity (Severity .WARN )
215+ .hasEventName ("messaging.receive.exception" )
216+ .hasAttributesSatisfyingExactly (
217+ equalTo (EXCEPTION_TYPE , error .getClass ().getName ()),
218+ equalTo (EXCEPTION_MESSAGE , error .getMessage ()),
219+ satisfies (
220+ EXCEPTION_STACKTRACE ,
221+ val ->
222+ val .contains (error .getClass ().getName ())
223+ .contains ("WrapperTest.testConsumerError" )));
224+ });
205225 }
206226 }
207-
208- private static void assertReceiveExceptionLog () {
209- Awaitility .await ()
210- .untilAsserted (
211- () -> {
212- List <LogRecordData > logs =
213- testing .logRecords ().stream ()
214- .filter (log -> "messaging.receive.exception" .equals (log .getEventName ()))
215- .collect (toList ());
216- assertThat (logs ).hasSize (1 );
217- assertThat (logs .get (0 ))
218- .hasSeverity (Severity .WARN )
219- .hasEventName ("messaging.receive.exception" )
220- .hasAttributesSatisfyingExactly (
221- satisfies (EXCEPTION_TYPE , val -> val .isNotNull ()),
222- satisfies (EXCEPTION_STACKTRACE , val -> val .isNotNull ()));
223- });
224- }
225227}
0 commit comments