66package io .opentelemetry .instrumentation .kafkaconnect .v2_6 ;
77
88import static io .opentelemetry .api .trace .SpanKind .CONSUMER ;
9+ import static io .opentelemetry .instrumentation .api .internal .SemconvStability .emitStableDatabaseSemconv ;
910import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .equalTo ;
1011import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .satisfies ;
1112import static io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes .MESSAGING_BATCH_MESSAGE_COUNT ;
1819import static io .opentelemetry .semconv .incubating .ThreadIncubatingAttributes .THREAD_NAME ;
1920import static io .restassured .RestAssured .given ;
2021import static java .lang .String .format ;
22+ import static org .assertj .core .api .Assertions .assertThat ;
2123import static org .awaitility .Awaitility .await ;
2224
2325import io .opentelemetry .api .trace .Span ;
@@ -160,8 +162,14 @@ void testSingleMessage() throws Exception {
160162 trace -> {
161163 // kafka connect consumer trace, linked to producer span via a span link
162164 Consumer <SpanDataAssert > selectAssertion =
163- span ->
164- span .hasName ("SELECT test" ).hasKind (SpanKind .CLIENT ).hasParent (trace .getSpan (0 ));
165+ span -> {
166+ if (emitStableDatabaseSemconv ()) {
167+ span .satisfies (spanData -> assertThat (spanData .getName ()).startsWith ("SELECT" ));
168+ } else {
169+ span .hasName ("SELECT " + DATABASE_NAME );
170+ }
171+ span .hasKind (SpanKind .CLIENT ).hasParent (trace .getSpan (0 ));
172+ };
165173
166174 trace .hasSpansSatisfyingExactly (
167175 span ->
@@ -182,7 +190,10 @@ void testSingleMessage() throws Exception {
182190 selectAssertion ,
183191 selectAssertion ,
184192 span ->
185- span .hasName ("INSERT test." + DB_TABLE_PERSON )
193+ span .hasName (
194+ emitStableDatabaseSemconv ()
195+ ? "INSERT \" " + DB_TABLE_PERSON + "\" "
196+ : "INSERT " + DATABASE_NAME + "." + DB_TABLE_PERSON )
186197 .hasKind (SpanKind .CLIENT )
187198 .hasParent (trace .getSpan (0 )));
188199 },
@@ -285,8 +296,14 @@ void testMultiTopic() throws Exception {
285296 trace -> {
286297 // kafka connect consumer trace, linked to producer span via a span link
287298 Consumer <SpanDataAssert > selectAssertion =
288- span ->
289- span .hasName ("SELECT test" ).hasKind (SpanKind .CLIENT ).hasParent (trace .getSpan (0 ));
299+ span -> {
300+ if (emitStableDatabaseSemconv ()) {
301+ span .satisfies (spanData -> assertThat (spanData .getName ()).startsWith ("SELECT" ));
302+ } else {
303+ span .hasName ("SELECT " + DATABASE_NAME );
304+ }
305+ span .hasKind (SpanKind .CLIENT ).hasParent (trace .getSpan (0 ));
306+ };
290307
291308 trace .hasSpansSatisfyingExactly (
292309 span ->
@@ -309,7 +326,10 @@ void testMultiTopic() throws Exception {
309326 selectAssertion ,
310327 selectAssertion ,
311328 span ->
312- span .hasName ("INSERT test." + DB_TABLE_PERSON )
329+ span .hasName (
330+ emitStableDatabaseSemconv ()
331+ ? "BATCH INSERT \" " + DB_TABLE_PERSON + "\" "
332+ : "INSERT " + DATABASE_NAME + "." + DB_TABLE_PERSON )
313333 .hasKind (SpanKind .CLIENT )
314334 .hasParent (trace .getSpan (0 )));
315335 },
0 commit comments