From 2af0f4bf6093079af197adc263718dbbb8c3435b Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Wed, 10 Jun 2026 06:14:08 -0700 Subject: [PATCH 01/10] Support mixed Cassandra batch query sanitization --- .../semconv/db/DbClientSpanNameExtractor.java | 2 +- .../api/incubator/semconv/db/MultiQuery.java | 41 ++++-- .../db/SqlClientAttributesExtractor.java | 17 ++- .../semconv/db/SqlClientAttributesGetter.java | 18 ++- .../db/SqlClientAttributesExtractorTest.java | 61 +++++++++ .../cassandra/v3_0/CassandraRequest.java | 104 +++++++++++++- .../v3_0/CassandraSqlAttributesGetter.java | 13 +- .../cassandra/v3_0/TracingSession.java | 24 +--- .../cassandra/v3_0/CassandraClientTest.java | 100 ++++++++++++++ .../cassandra/v4_0/CassandraRequest.java | 98 +++++++++++++- .../v4_0/CassandraSqlAttributesGetter.java | 13 +- .../cassandra/v4_0/TracingCqlSession.java | 25 +--- .../v4_4/CassandraAttributesExtractor.java | 2 + .../cassandra/v4_4/CassandraRequest.java | 127 ++++++++++++++++- .../v4_4/CassandraSqlAttributesGetter.java | 15 +- .../cassandra/v4_4/TracingCqlSession.java | 25 +--- .../common/v4_0/AbstractCassandraTest.java | 128 ++++++++++++++++++ .../jdbc/internal/JdbcAttributesGetter.java | 3 +- .../internal/R2dbcSqlAttributesGetter.java | 3 +- .../v4_0/VertxSqlClientAttributesGetter.java | 3 +- 20 files changed, 712 insertions(+), 110 deletions(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java index 9dc38ca35aeb..28172e8cb82e 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java @@ -227,7 +227,7 @@ public String extract(REQUEST request) { getter, request, batch ? "BATCH" : null, null, analyzedQuery.getStoredProcedureName()); } - MultiQuery multiQuery = MultiQuery.analyzeWithSummary(rawQueryTexts, dialect, false); + MultiQuery multiQuery = MultiQuery.analyzeWithSummary(rawQueryTexts, dialect); String querySummary = multiQuery.getQuerySummary(); if (querySummary != null) { return querySummary; diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java index a0ef7636e331..29479d915f1b 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java @@ -23,23 +23,18 @@ private MultiQuery( this.querySummary = querySummary; } - static MultiQuery analyzeWithSummary( - Collection rawQueryTexts, SqlDialect dialect, boolean querySanitizationEnabled) { - UniqueValue uniqueStoredProcedureName = new UniqueValue(); - Set uniqueQueryTexts = new LinkedHashSet<>(); - UniqueValue uniqueQuerySummary = new UniqueValue(); + static MultiQuery analyzeWithSummary(Collection rawQueryTexts, SqlDialect dialect) { + Builder builder = builder(); for (String rawQueryText : rawQueryTexts) { SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect); - uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName()); - uniqueQueryTexts.add(querySanitizationEnabled ? analyzedQuery.getQueryText() : rawQueryText); - uniqueQuerySummary.set(analyzedQuery.getQuerySummary()); + builder.add(analyzedQuery, rawQueryText); } - String querySummary = uniqueQuerySummary.getValue(); - return new MultiQuery( - uniqueStoredProcedureName.getValue(), - uniqueQueryTexts, - querySummary == null ? "BATCH" : "BATCH " + querySummary); + return builder.build(); + } + + static Builder builder() { + return new Builder(); } @Nullable @@ -56,6 +51,26 @@ public Set getQueryTexts() { return queryTexts; } + static class Builder { + private final UniqueValue uniqueStoredProcedureName = new UniqueValue(); + private final Set uniqueQueryTexts = new LinkedHashSet<>(); + private final UniqueValue uniqueQuerySummary = new UniqueValue(); + + void add(SqlQuery analyzedQuery, @Nullable String queryText) { + uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName()); + uniqueQueryTexts.add(queryText); + uniqueQuerySummary.set(analyzedQuery.getQuerySummary()); + } + + MultiQuery build() { + String querySummary = uniqueQuerySummary.getValue(); + return new MultiQuery( + uniqueStoredProcedureName.getValue(), + uniqueQueryTexts, + querySummary == null ? "BATCH" : "BATCH " + querySummary); + } + } + private static class UniqueValue { @Nullable private String value; private boolean valid = true; diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractor.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractor.java index 856c5e6b3f61..874de62b85f5 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractor.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractor.java @@ -109,11 +109,11 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST if (isBatch) { attributes.put(DB_OPERATION_BATCH_SIZE, batchSize); } - boolean parameterizedQuery = getter.isParameterizedQuery(request); - boolean shouldSanitize = querySanitizationEnabled && !parameterizedQuery; if (rawQueryTexts.size() == 1) { String rawQueryText = rawQueryTexts.iterator().next(); SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect); + boolean shouldSanitize = + querySanitizationEnabled && !getter.isParameterizedQuery(request, 0); attributes.put(DB_QUERY_TEXT, shouldSanitize ? analyzedQuery.getQueryText() : rawQueryText); String querySummary = analyzedQuery.getQuerySummary(); attributes.put( @@ -125,9 +125,16 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST } attributes.put(DB_STORED_PROCEDURE_NAME, analyzedQuery.getStoredProcedureName()); } else if (rawQueryTexts.size() > 1) { - MultiQuery multiQuery = - MultiQuery.analyzeWithSummary( - getter.getRawQueryTexts(request), dialect, shouldSanitize); + MultiQuery.Builder builder = MultiQuery.builder(); + int queryIndex = 0; + for (String rawQueryText : rawQueryTexts) { + SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect); + boolean shouldSanitize = + querySanitizationEnabled && !getter.isParameterizedQuery(request, queryIndex); + builder.add(analyzedQuery, shouldSanitize ? analyzedQuery.getQueryText() : rawQueryText); + queryIndex++; + } + MultiQuery multiQuery = builder.build(); attributes.put(DB_QUERY_TEXT, join("; ", multiQuery.getQueryTexts())); attributes.put(DB_QUERY_SUMMARY, multiQuery.getQuerySummary()); attributes.put(DB_STORED_PROCEDURE_NAME, multiQuery.getStoredProcedureName()); diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesGetter.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesGetter.java index d3ee2b078192..c3b2ac869b47 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesGetter.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesGetter.java @@ -71,9 +71,25 @@ default String getDbQuerySummary(REQUEST request) { * query does not need to be sanitized. See sanitization * of db.query.text. + * + * @deprecated use {@link #isParameterizedQuery(Object, int)} instead */ - // TODO: make this required to implement + @Deprecated default boolean isParameterizedQuery(REQUEST request) { return false; } + + /** + * Returns whether the query at {@code queryIndex} in {@link #getRawQueryTexts(Object)} is + * parameterized. + * + *

The {@code queryIndex} is zero-based and follows the iteration order of {@link + * #getRawQueryTexts(Object)}. This supports batch operations where individual entries may have + * different parameterization. + */ + // TODO: make this required to implement + @SuppressWarnings("deprecation") + default boolean isParameterizedQuery(REQUEST request, int queryIndex) { + return isParameterizedQuery(request); + } } diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java index 22c873881cde..8d37a5328e18 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java @@ -36,6 +36,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.jupiter.api.Test; @@ -108,6 +109,16 @@ static class TestMultiAttributesGetter extends TestAttributesGetter public Collection getRawQueryTexts(Map map) { return (Collection) map.get("db.query.texts"); } + + @SuppressWarnings("unchecked") + @Override + public boolean isParameterizedQuery(Map map, int queryIndex) { + List parameterizedQueries = (List) map.get("db.query.parameterized"); + if (parameterizedQueries == null) { + return super.isParameterizedQuery(map, queryIndex); + } + return parameterizedQueries.get(queryIndex); + } } @SuppressWarnings("deprecation") // TODO DB_CONNECTION_STRING deprecation @@ -351,6 +362,56 @@ void shouldExtractMultiQueryBatchAttributes() { assertThat(endAttributes.build().isEmpty()).isTrue(); } + @Test + void shouldExtractMixedParameterizedMultiQueryBatchAttributes() { + // given + Map request = new HashMap<>(); + request.put("db.namespace", "potatoes"); + request.put( + "db.query.texts", + asList("INSERT INTO potato VALUES(?)", "UPDATE potato SET name='bob' WHERE id=1")); + request.put("db.query.parameterized", asList(true, false)); + request.put(DB_OPERATION_BATCH_SIZE.getKey(), 2L); + + Context context = Context.root(); + + AttributesExtractor, Void> underTest = + SqlClientAttributesExtractor.create(new TestMultiAttributesGetter()); + + // when + AttributesBuilder startAttributes = Attributes.builder(); + underTest.onStart(startAttributes, context, request); + + AttributesBuilder endAttributes = Attributes.builder(); + underTest.onEnd(endAttributes, context, request, null, null); + + // then + if (emitStableDatabaseSemconv() && emitOldDatabaseSemconv()) { + assertThat(startAttributes.build()) + .containsOnly( + entry(DB_NAME, "potatoes"), + entry(DB_NAMESPACE, "potatoes"), + entry( + DB_QUERY_TEXT, + "INSERT INTO potato VALUES(?); UPDATE potato SET name=? WHERE id=?"), + entry(DB_QUERY_SUMMARY, "BATCH"), + entry(DB_OPERATION_BATCH_SIZE, 2L)); + } else if (emitOldDatabaseSemconv()) { + assertThat(startAttributes.build()).containsOnly(entry(DB_NAME, "potatoes")); + } else if (emitStableDatabaseSemconv()) { + assertThat(startAttributes.build()) + .containsOnly( + entry(DB_NAMESPACE, "potatoes"), + entry( + DB_QUERY_TEXT, + "INSERT INTO potato VALUES(?); UPDATE potato SET name=? WHERE id=?"), + entry(DB_QUERY_SUMMARY, "BATCH"), + entry(DB_OPERATION_BATCH_SIZE, 2L)); + } + + assertThat(endAttributes.build().isEmpty()).isTrue(); + } + @Test void shouldIgnoreBatchSizeOne() { // given diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java index 3dd0971e668a..b65012b17ee1 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java @@ -5,20 +5,112 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0; +import static java.util.Collections.singleton; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.RegularStatement; import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; @AutoValue abstract class CassandraRequest { - public static CassandraRequest create( - Session session, String queryText, boolean parameterizedQuery) { - return new AutoValue_CassandraRequest(session, queryText, parameterizedQuery); + static CassandraRequest create(Session session, String queryText, boolean parameterizedQuery) { + return create(session, singleton(queryText), parameterizedQuery, null, null); + } + + static CassandraRequest create(Session session, String queryText) { + return create(session, singleton(queryText), false, null, null); + } + + static CassandraRequest create(Session session, Statement statement) { + if (statement instanceof BatchStatement) { + return create(session, (BatchStatement) statement); + } + return create( + session, singleton(getQuery(statement)), statement instanceof BoundStatement, null, null); + } + + private static CassandraRequest create(Session session, BatchStatement batchStatement) { + List queryTexts = new ArrayList<>(); + List parameterizedQueries = null; + boolean allParameterized = true; + Boolean firstParameterizedQuery = null; + int queryIndex = 0; + for (Statement batchEntry : batchStatement.getStatements()) { + queryTexts.add(getQuery(batchEntry)); + boolean parameterizedQuery = batchEntry instanceof BoundStatement; + if (!parameterizedQuery) { + allParameterized = false; + } + if (firstParameterizedQuery == null) { + firstParameterizedQuery = parameterizedQuery; + } else if (parameterizedQuery != firstParameterizedQuery && parameterizedQueries == null) { + parameterizedQueries = new ArrayList<>(batchStatement.size()); + for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) { + parameterizedQueries.add(firstParameterizedQuery); + } + } + if (parameterizedQueries != null) { + parameterizedQueries.add(parameterizedQuery); + } + queryIndex++; + } + boolean parameterizedQuery = allParameterized; + if (parameterizedQueries == null && firstParameterizedQuery != null) { + parameterizedQuery = firstParameterizedQuery; + } + return create( + session, + queryTexts, + parameterizedQuery, + parameterizedQueries, + Long.valueOf(batchStatement.size())); + } + + private static CassandraRequest create( + Session session, + Collection queryTexts, + boolean parameterizedQuery, + @Nullable List parameterizedQueries, + @Nullable Long batchSize) { + return new AutoValue_CassandraRequest( + session, queryTexts, parameterizedQuery, parameterizedQueries, batchSize); + } + + private static String getQuery(Statement statement) { + String query = null; + if (statement instanceof BoundStatement) { + query = ((BoundStatement) statement).preparedStatement().getQueryString(); + } else if (statement instanceof RegularStatement) { + query = ((RegularStatement) statement).getQueryString(); + } + + return query == null ? "" : query; } - public abstract Session getSession(); + abstract Session getSession(); - public abstract String getQueryText(); + abstract Collection getQueryTexts(); + + abstract boolean parameterizedQuery(); + + @Nullable + abstract List getParameterizedQueries(); + + boolean isParameterizedQuery(int queryIndex) { + List parameterizedQueries = getParameterizedQueries(); + return parameterizedQueries == null + ? parameterizedQuery() + : parameterizedQueries.get(queryIndex); + } - public abstract boolean isParameterizedQuery(); + @Nullable + abstract Long getBatchSize(); } diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSqlAttributesGetter.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSqlAttributesGetter.java index f3631de1b095..f1e962d6dc21 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSqlAttributesGetter.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSqlAttributesGetter.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0; import static io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlDialect.DOUBLE_QUOTES_ARE_IDENTIFIERS; -import static java.util.Collections.singleton; import com.datastax.driver.core.ExecutionInfo; import io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlClientAttributesGetter; @@ -39,7 +38,13 @@ public String getDbNamespace(CassandraRequest request) { @Override public Collection getRawQueryTexts(CassandraRequest request) { - return singleton(request.getQueryText()); + return request.getQueryTexts(); + } + + @Override + @Nullable + public Long getDbOperationBatchSize(CassandraRequest request) { + return request.getBatchSize(); } @Nullable @@ -50,7 +55,7 @@ public InetSocketAddress getNetworkPeerInetSocketAddress( } @Override - public boolean isParameterizedQuery(CassandraRequest request) { - return request.isParameterizedQuery(); + public boolean isParameterizedQuery(CassandraRequest request, int queryIndex) { + return request.isParameterizedQuery(queryIndex); } } diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/TracingSession.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/TracingSession.java index 799677a70d74..83454f1ab6d7 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/TracingSession.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/TracingSession.java @@ -7,7 +7,6 @@ import static io.opentelemetry.javaagent.instrumentation.cassandra.v3_0.CassandraSingletons.instrumenter; -import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.CloseFuture; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.PreparedStatement; @@ -48,7 +47,7 @@ public ListenableFuture initAsync() { @Override public ResultSet execute(String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); Context context = instrumenter().start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -93,9 +92,7 @@ public ResultSet execute(String query, Map values) { @Override public ResultSet execute(Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); Context context = instrumenter().start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -110,7 +107,7 @@ public ResultSet execute(Statement statement) { @Override public ResultSetFuture executeAsync(String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); Context context = instrumenter().start(Context.current(), request); try (Scope ignored = context.makeCurrent()) { ResultSetFuture future = session.executeAsync(query); @@ -152,9 +149,7 @@ public ResultSetFuture executeAsync(String query, Map values) { @Override public ResultSetFuture executeAsync(Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); Context context = instrumenter().start(Context.current(), request); try (Scope ignored = context.makeCurrent()) { ResultSetFuture future = session.executeAsync(statement); @@ -211,17 +206,6 @@ public State getState() { return session.getState(); } - private static String getQuery(Statement statement) { - String query = null; - if (statement instanceof BoundStatement) { - query = ((BoundStatement) statement).preparedStatement().getQueryString(); - } else if (statement instanceof RegularStatement) { - query = ((RegularStatement) statement).getQueryString(); - } - - return query == null ? "" : query; - } - private static void addCallbackToEndSpan( ResultSetFuture future, Context context, CassandraRequest request) { Futures.addCallback( diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java index 4de7930899d2..1f9b006b923d 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java @@ -10,6 +10,7 @@ import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.semconv.DbAttributes.DB_COLLECTION_NAME; +import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_BATCH_SIZE; import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_NAME; import static io.opentelemetry.semconv.DbAttributes.DB_QUERY_SUMMARY; import static io.opentelemetry.semconv.DbAttributes.DB_SYSTEM_NAME; @@ -26,9 +27,12 @@ import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DbSystemNameIncubatingValues.CASSANDRA; import static org.junit.jupiter.api.Named.named; +import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; @@ -291,6 +295,102 @@ void testMetrics() { SERVER_PORT); } + @Test + void batchStatementWithSameQuery() { + Session session = cluster.connect(); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS batch_same_test"); + session.execute( + "CREATE KEYSPACE batch_same_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE batch_same_test.users ( name text PRIMARY KEY, age int )"); + PreparedStatement preparedStatement = + session.prepare("INSERT INTO batch_same_test.users (name, age) values (?, ?)"); + testing.clearData(); + + BatchStatement batchStatement = + new BatchStatement() + .add(preparedStatement.bind("alice", 1)) + .add(preparedStatement.bind("bob", 2)); + session.execute(batchStatement); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName( + emitStableDatabaseSemconv() + ? "BATCH INSERT batch_same_test.users" + : "DB Query") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_TYPE, "ipv4"), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO batch_same_test.users (name, age) values (?, ?)" + : null), + equalTo( + DB_OPERATION_BATCH_SIZE, emitStableDatabaseSemconv() ? 2L : null), + equalTo( + DB_QUERY_SUMMARY, + emitStableDatabaseSemconv() + ? "BATCH INSERT batch_same_test.users" + : null)))); + } + + @Test + void batchStatementWithDifferentQueries() { + Session session = cluster.connect(); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS batch_mixed_test"); + session.execute( + "CREATE KEYSPACE batch_mixed_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); + PreparedStatement insertStatement = + session.prepare("INSERT INTO batch_mixed_test.users (name, age) values (?, ?)"); + testing.clearData(); + + BatchStatement batchStatement = + new BatchStatement() + .add(insertStatement.bind("alice", 1)) + .add( + new SimpleStatement( + "UPDATE batch_mixed_test.users SET age = 2 WHERE name = 'alice'")); + session.execute(batchStatement); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(emitStableDatabaseSemconv() ? "BATCH" : "DB Query") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_TYPE, "ipv4"), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO batch_mixed_test.users (name, age) values (?, ?); UPDATE batch_mixed_test.users SET age = ? WHERE name = ?" + : null), + equalTo( + DB_OPERATION_BATCH_SIZE, emitStableDatabaseSemconv() ? 2L : null), + equalTo( + DB_QUERY_SUMMARY, emitStableDatabaseSemconv() ? "BATCH" : null)))); + } + private static Stream provideSyncParameters() { return Stream.of( Arguments.of( diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java index fd2b1dbeff91..f843506a6bad 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java @@ -5,19 +5,109 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0; +import static java.util.Collections.singleton; + +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchableStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.session.Session; import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; @AutoValue abstract class CassandraRequest { - static CassandraRequest create(Session session, String queryText, boolean parameterizedQuery) { - return new AutoValue_CassandraRequest(session, queryText, parameterizedQuery); + static CassandraRequest create(Session session, String queryText) { + return create(session, singleton(queryText), false, null, null); + } + + static CassandraRequest create(Session session, Statement statement) { + if (statement instanceof BatchStatement) { + return create(session, (BatchStatement) statement); + } + return create( + session, singleton(getQuery(statement)), statement instanceof BoundStatement, null, null); + } + + private static CassandraRequest create(Session session, BatchStatement batchStatement) { + List queryTexts = new ArrayList<>(); + List parameterizedQueries = null; + boolean allParameterized = true; + Boolean firstParameterizedQuery = null; + int queryIndex = 0; + for (BatchableStatement batchEntry : batchStatement) { + queryTexts.add(getQuery(batchEntry)); + boolean parameterizedQuery = batchEntry instanceof BoundStatement; + if (!parameterizedQuery) { + allParameterized = false; + } + if (firstParameterizedQuery == null) { + firstParameterizedQuery = parameterizedQuery; + } else if (parameterizedQuery != firstParameterizedQuery && parameterizedQueries == null) { + parameterizedQueries = new ArrayList<>(batchStatement.size()); + for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) { + parameterizedQueries.add(firstParameterizedQuery); + } + } + if (parameterizedQueries != null) { + parameterizedQueries.add(parameterizedQuery); + } + queryIndex++; + } + boolean parameterizedQuery = allParameterized; + if (parameterizedQueries == null && firstParameterizedQuery != null) { + parameterizedQuery = firstParameterizedQuery; + } + return create( + session, + queryTexts, + parameterizedQuery, + parameterizedQueries, + Long.valueOf(batchStatement.size())); + } + + private static CassandraRequest create( + Session session, + Collection queryTexts, + boolean parameterizedQuery, + @Nullable List parameterizedQueries, + @Nullable Long batchSize) { + return new AutoValue_CassandraRequest( + session, queryTexts, parameterizedQuery, parameterizedQueries, batchSize); + } + + private static String getQuery(Statement statement) { + String query = null; + if (statement instanceof SimpleStatement) { + query = ((SimpleStatement) statement).getQuery(); + } else if (statement instanceof BoundStatement) { + query = ((BoundStatement) statement).getPreparedStatement().getQuery(); + } + + return query == null ? "" : query; } abstract Session getSession(); - abstract String getQueryText(); + abstract Collection getQueryTexts(); + + abstract boolean parameterizedQuery(); + + @Nullable + abstract List getParameterizedQueries(); + + boolean isParameterizedQuery(int queryIndex) { + List parameterizedQueries = getParameterizedQueries(); + return parameterizedQueries == null + ? parameterizedQuery() + : parameterizedQueries.get(queryIndex); + } - abstract boolean isParameterizedQuery(); + @Nullable + abstract Long getBatchSize(); } diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraSqlAttributesGetter.java b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraSqlAttributesGetter.java index b22fd7ad7979..7c3df7067adf 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraSqlAttributesGetter.java +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraSqlAttributesGetter.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0; import static io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlDialect.DOUBLE_QUOTES_ARE_IDENTIFIERS; -import static java.util.Collections.singleton; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; @@ -42,7 +41,13 @@ public String getDbNamespace(CassandraRequest request) { @Override public Collection getRawQueryTexts(CassandraRequest request) { - return singleton(request.getQueryText()); + return request.getQueryTexts(); + } + + @Override + @Nullable + public Long getDbOperationBatchSize(CassandraRequest request) { + return request.getBatchSize(); } @Nullable @@ -63,7 +68,7 @@ public InetSocketAddress getNetworkPeerInetSocketAddress( } @Override - public boolean isParameterizedQuery(CassandraRequest request) { - return request.isParameterizedQuery(); + public boolean isParameterizedQuery(CassandraRequest request, int queryIndex) { + return request.isParameterizedQuery(queryIndex); } } diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/TracingCqlSession.java b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/TracingCqlSession.java index 31dc0f979520..ac07bd01e738 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/TracingCqlSession.java +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/TracingCqlSession.java @@ -11,10 +11,8 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DriverException; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -67,7 +65,7 @@ static CqlSession wrapSession(CqlSession session) { } private static ResultSet execute(CqlSession session, String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); Context context = instrumenter().start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -81,9 +79,7 @@ private static ResultSet execute(CqlSession session, String query) { } private static ResultSet execute(CqlSession session, Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); Context context = instrumenter().start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -98,14 +94,12 @@ private static ResultSet execute(CqlSession session, Statement statement) { private static CompletionStage executeAsync( CqlSession session, Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); return executeAsync(request, () -> session.executeAsync(statement)); } private static CompletionStage executeAsync(CqlSession session, String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); return executeAsync(request, () -> session.executeAsync(query)); } @@ -144,17 +138,6 @@ private static CompletableFuture wrap(CompletionStage future, Context return result; } - private static String getQuery(Statement statement) { - String query = null; - if (statement instanceof SimpleStatement) { - query = ((SimpleStatement) statement).getQuery(); - } else if (statement instanceof BoundStatement) { - query = ((BoundStatement) statement).getPreparedStatement().getQuery(); - } - - return query == null ? "" : query; - } - @Nullable private static ExecutionInfo getExecutionInfo( @Nullable AsyncResultSet asyncResultSet, @Nullable Throwable throwable) { diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java index 6456cd92174e..1674f1e6c291 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java @@ -110,6 +110,8 @@ public void onEnd( Statement statement = (Statement) executionInfo.getRequest(); String consistencyLevel; + // getSession() is deprecated only because its visibility will be reduced in the future. + @SuppressWarnings("deprecation") DriverExecutionProfile config = request.getSession().getContext().getConfig().getDefaultProfile(); if (statement.getConsistencyLevel() != null) { diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java index 49be655774ab..333c1fb40bc4 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java @@ -5,20 +5,141 @@ package io.opentelemetry.instrumentation.cassandra.v4_4; +import static java.util.Collections.singleton; + +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchableStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.session.Session; import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; @AutoValue public abstract class CassandraRequest { + /** + * @deprecated use {@link #create(Session, String)} instead + */ + @Deprecated public static CassandraRequest create( Session session, String queryText, boolean parameterizedQuery) { - return new AutoValue_CassandraRequest(session, queryText, parameterizedQuery); + return create(session, singleton(queryText), parameterizedQuery, null, null); + } + + static CassandraRequest create(Session session, String queryText) { + return create(session, singleton(queryText), false, null, null); + } + + static CassandraRequest create(Session session, Statement statement) { + if (statement instanceof BatchStatement) { + return create(session, (BatchStatement) statement); + } + return create(session, getQuery(statement), statement instanceof BoundStatement); + } + + private static CassandraRequest create(Session session, BatchStatement batchStatement) { + List queryTexts = new ArrayList<>(); + List parameterizedQueries = null; + boolean allParameterized = true; + Boolean firstParameterizedQuery = null; + int queryIndex = 0; + for (BatchableStatement batchEntry : batchStatement) { + queryTexts.add(getQuery(batchEntry)); + boolean parameterizedQuery = batchEntry instanceof BoundStatement; + if (!parameterizedQuery) { + allParameterized = false; + } + if (firstParameterizedQuery == null) { + firstParameterizedQuery = parameterizedQuery; + } else if (parameterizedQuery != firstParameterizedQuery && parameterizedQueries == null) { + parameterizedQueries = new ArrayList<>(batchStatement.size()); + for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) { + parameterizedQueries.add(firstParameterizedQuery); + } + } + if (parameterizedQueries != null) { + parameterizedQueries.add(parameterizedQuery); + } + queryIndex++; + } + boolean parameterizedQuery = allParameterized; + if (parameterizedQueries == null && firstParameterizedQuery != null) { + parameterizedQuery = firstParameterizedQuery; + } + return create( + session, + queryTexts, + parameterizedQuery, + parameterizedQueries, + Long.valueOf(batchStatement.size())); + } + + private static CassandraRequest create( + Session session, + Collection queryTexts, + boolean parameterizedQuery, + @Nullable List parameterizedQueries, + @Nullable Long batchSize) { + return new AutoValue_CassandraRequest( + session, queryTexts, parameterizedQuery, parameterizedQueries, batchSize); + } + + private static String getQuery(Statement statement) { + String query = null; + if (statement instanceof SimpleStatement) { + query = ((SimpleStatement) statement).getQuery(); + } else if (statement instanceof BoundStatement) { + query = ((BoundStatement) statement).getPreparedStatement().getQuery(); + } + + return query == null ? "" : query; } + /** + * @deprecated this method will be reduced to package-private visibility + */ + @Deprecated public abstract Session getSession(); - public abstract String getQueryText(); + abstract Collection getQueryTexts(); + + /** + * Returns the raw query text. + * + * @deprecated use {@link #getQueryTexts()} instead + */ + @Deprecated + public String getQueryText() { + return getQueryTexts().isEmpty() ? "" : getQueryTexts().iterator().next(); + } + + abstract boolean parameterizedQuery(); + + @Nullable + abstract List getParameterizedQueries(); + + /** + * Returns whether all queries in this request are parameterized. + * + * @deprecated use {@link #isParameterizedQuery(int)} instead + */ + @Deprecated + public boolean isParameterizedQuery() { + return parameterizedQuery(); + } + + boolean isParameterizedQuery(int queryIndex) { + List parameterizedQueries = getParameterizedQueries(); + return parameterizedQueries == null + ? parameterizedQuery() + : parameterizedQueries.get(queryIndex); + } - public abstract boolean isParameterizedQuery(); + @Nullable + abstract Long getBatchSize(); } diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java index 176e1d4e7ee4..c76b82965cf9 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java @@ -6,7 +6,6 @@ package io.opentelemetry.instrumentation.cassandra.v4_4; import static io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlDialect.DOUBLE_QUOTES_ARE_IDENTIFIERS; -import static java.util.Collections.singleton; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; @@ -36,6 +35,8 @@ public SqlDialect getSqlDialect(CassandraRequest request) { return DOUBLE_QUOTES_ARE_IDENTIFIERS; } + // getSession() is deprecated only because its visibility will be reduced in the future. + @SuppressWarnings("deprecation") @Override @Nullable public String getDbNamespace(CassandraRequest request) { @@ -44,7 +45,13 @@ public String getDbNamespace(CassandraRequest request) { @Override public Collection getRawQueryTexts(CassandraRequest request) { - return singleton(request.getQueryText()); + return request.getQueryTexts(); + } + + @Override + @Nullable + public Long getDbOperationBatchSize(CassandraRequest request) { + return request.getBatchSize(); } @Nullable @@ -67,7 +74,7 @@ public InetSocketAddress getNetworkPeerInetSocketAddress( } @Override - public boolean isParameterizedQuery(CassandraRequest request) { - return request.isParameterizedQuery(); + public boolean isParameterizedQuery(CassandraRequest request, int queryIndex) { + return request.isParameterizedQuery(queryIndex); } } diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java index 0028a27d50a8..10d5822370a6 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java @@ -12,10 +12,8 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DriverException; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -83,7 +81,7 @@ CqlSession wrapSession(CqlSession session) { } private ResultSet execute(CqlSession session, String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); Context context = instrumenter.start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -97,9 +95,7 @@ private ResultSet execute(CqlSession session, String query) { } private ResultSet execute(CqlSession session, Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); Context context = instrumenter.start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -113,14 +109,12 @@ private ResultSet execute(CqlSession session, Statement statement) { } private CompletionStage executeAsync(CqlSession session, Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); return executeAsync(request, () -> session.executeAsync(statement)); } private CompletionStage executeAsync(CqlSession session, String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); return executeAsync(request, () -> session.executeAsync(query)); } @@ -163,17 +157,6 @@ private static CompletableFuture wrap(CompletionStage future, Context return result; } - private static String getQuery(Statement statement) { - String query = null; - if (statement instanceof SimpleStatement) { - query = ((SimpleStatement) statement).getQuery(); - } else if (statement instanceof BoundStatement) { - query = ((BoundStatement) statement).getPreparedStatement().getQuery(); - } - - return query == null ? "" : query; - } - @Nullable private static ExecutionInfo getExecutionInfo( @Nullable AsyncResultSet asyncResultSet, @Nullable Throwable throwable) { diff --git a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java index d2ae4126d977..c2a06472d444 100644 --- a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java +++ b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java @@ -11,6 +11,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static io.opentelemetry.semconv.DbAttributes.DB_COLLECTION_NAME; +import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_BATCH_SIZE; import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_NAME; import static io.opentelemetry.semconv.DbAttributes.DB_QUERY_SUMMARY; import static io.opentelemetry.semconv.DbAttributes.DB_SYSTEM_NAME; @@ -37,6 +38,10 @@ import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.DefaultBatchType; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; @@ -122,6 +127,129 @@ void testMetrics() { SERVER_PORT); } + @Test + void batchStatementWithSameQuery() { + CqlSession session = getSession(null); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS batch_same_test"); + session.execute( + "CREATE KEYSPACE batch_same_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE batch_same_test.users ( name text PRIMARY KEY, age int )"); + PreparedStatement preparedStatement = + session.prepare("INSERT INTO batch_same_test.users (name, age) values (?, ?)"); + testing().clearData(); + + BatchStatement batchStatement = + BatchStatement.newInstance( + DefaultBatchType.LOGGED, + preparedStatement.bind("alice", 1), + preparedStatement.bind("bob", 2)); + session.execute(batchStatement); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName( + emitStableDatabaseSemconv() + ? "BATCH INSERT batch_same_test.users" + : "DB Query") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + satisfies(NETWORK_TYPE, val -> val.isIn("ipv4", "ipv6")), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO batch_same_test.users (name, age) values (?, ?)" + : null), + equalTo( + DB_OPERATION_BATCH_SIZE, + emitStableDatabaseSemconv() ? 2L : null), + equalTo( + DB_QUERY_SUMMARY, + emitStableDatabaseSemconv() + ? "BATCH INSERT batch_same_test.users" + : null), + equalTo(maybeStable(DB_CASSANDRA_CONSISTENCY_LEVEL), "LOCAL_ONE"), + equalTo(maybeStable(DB_CASSANDRA_COORDINATOR_DC), "datacenter1"), + satisfies( + maybeStable(DB_CASSANDRA_COORDINATOR_ID), + val -> val.isInstanceOf(String.class)), + satisfies( + maybeStable(DB_CASSANDRA_IDEMPOTENCE), + val -> val.isInstanceOf(Boolean.class)), + equalTo(maybeStable(DB_CASSANDRA_PAGE_SIZE), 5000), + equalTo( + maybeStable(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT), 0)))); + } + + @Test + void batchStatementWithDifferentQueries() { + CqlSession session = getSession(null); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS batch_mixed_test"); + session.execute( + "CREATE KEYSPACE batch_mixed_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); + PreparedStatement insertStatement = + session.prepare("INSERT INTO batch_mixed_test.users (name, age) values (?, ?)"); + testing().clearData(); + + BatchStatement batchStatement = + BatchStatement.newInstance( + DefaultBatchType.LOGGED, + insertStatement.bind("alice", 1), + SimpleStatement.newInstance( + "UPDATE batch_mixed_test.users SET age = 2 WHERE name = 'alice'")); + session.execute(batchStatement); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(emitStableDatabaseSemconv() ? "BATCH" : "DB Query") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + satisfies(NETWORK_TYPE, val -> val.isIn("ipv4", "ipv6")), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO batch_mixed_test.users (name, age) values (?, ?); UPDATE batch_mixed_test.users SET age = ? WHERE name = ?" + : null), + equalTo( + DB_OPERATION_BATCH_SIZE, + emitStableDatabaseSemconv() ? 2L : null), + equalTo( + DB_QUERY_SUMMARY, emitStableDatabaseSemconv() ? "BATCH" : null), + equalTo(maybeStable(DB_CASSANDRA_CONSISTENCY_LEVEL), "LOCAL_ONE"), + equalTo(maybeStable(DB_CASSANDRA_COORDINATOR_DC), "datacenter1"), + satisfies( + maybeStable(DB_CASSANDRA_COORDINATOR_ID), + val -> val.isInstanceOf(String.class)), + satisfies( + maybeStable(DB_CASSANDRA_IDEMPOTENCE), + val -> val.isInstanceOf(Boolean.class)), + equalTo(maybeStable(DB_CASSANDRA_PAGE_SIZE), 5000), + equalTo( + maybeStable(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT), 0)))); + } + @ParameterizedTest(name = "{index}: {0}") @MethodSource("provideSyncParameters") void syncTest(Parameter parameter) { diff --git a/instrumentation/jdbc/library/src/main/java/io/opentelemetry/instrumentation/jdbc/internal/JdbcAttributesGetter.java b/instrumentation/jdbc/library/src/main/java/io/opentelemetry/instrumentation/jdbc/internal/JdbcAttributesGetter.java index e225180e2edf..0fd7e2b551dc 100644 --- a/instrumentation/jdbc/library/src/main/java/io/opentelemetry/instrumentation/jdbc/internal/JdbcAttributesGetter.java +++ b/instrumentation/jdbc/library/src/main/java/io/opentelemetry/instrumentation/jdbc/internal/JdbcAttributesGetter.java @@ -134,7 +134,8 @@ public Map getDbQueryParameters(DbRequest request) { } @Override - public boolean isParameterizedQuery(DbRequest request) { + public boolean isParameterizedQuery(DbRequest request, int queryIndex) { + // JDBC does not support mixed parameterization within a single request. return request.isParameterizedQuery(); } diff --git a/instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/R2dbcSqlAttributesGetter.java b/instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/R2dbcSqlAttributesGetter.java index fe8f13275bdf..da4ac1d66c5d 100644 --- a/instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/R2dbcSqlAttributesGetter.java +++ b/instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/R2dbcSqlAttributesGetter.java @@ -90,7 +90,8 @@ public Integer getServerPort(DbExecution request) { } @Override - public boolean isParameterizedQuery(DbExecution request) { + public boolean isParameterizedQuery(DbExecution request, int queryIndex) { + // R2DBC does not support mixed parameterization within a single request. return request.isParameterizedQuery(); } } diff --git a/instrumentation/vertx/vertx-sql-client/vertx-sql-client-common-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/sqlclient/common/v4_0/VertxSqlClientAttributesGetter.java b/instrumentation/vertx/vertx-sql-client/vertx-sql-client-common-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/sqlclient/common/v4_0/VertxSqlClientAttributesGetter.java index e95ae357cd06..37e1479eb3b3 100644 --- a/instrumentation/vertx/vertx-sql-client/vertx-sql-client-common-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/sqlclient/common/v4_0/VertxSqlClientAttributesGetter.java +++ b/instrumentation/vertx/vertx-sql-client/vertx-sql-client-common-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/sqlclient/common/v4_0/VertxSqlClientAttributesGetter.java @@ -98,7 +98,8 @@ public String getErrorType( } @Override - public boolean isParameterizedQuery(VertxSqlClientRequest request) { + public boolean isParameterizedQuery(VertxSqlClientRequest request, int queryIndex) { + // Vert.x SQL client does not support mixed parameterization within a single request. return request.isParameterizedQuery(); } From 44cf6f184f530ebec550be81f1ad0c1f103eadf8 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Thu, 11 Jun 2026 10:18:40 -0700 Subject: [PATCH 02/10] Remove incorrect comment --- .../instrumentation/cassandra/v4_4/CassandraRequest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java index 333c1fb40bc4..d272029076e3 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java @@ -22,9 +22,6 @@ @AutoValue public abstract class CassandraRequest { - /** - * @deprecated use {@link #create(Session, String)} instead - */ @Deprecated public static CassandraRequest create( Session session, String queryText, boolean parameterizedQuery) { From e671217fc6aff5395081dbfcd50d7d588fb36d9a Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Thu, 11 Jun 2026 10:29:15 -0700 Subject: [PATCH 03/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../instrumentation/cassandra/v4_4/CassandraRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java index d272029076e3..e4f0c0b28d77 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java @@ -36,7 +36,7 @@ static CassandraRequest create(Session session, Statement statement) { if (statement instanceof BatchStatement) { return create(session, (BatchStatement) statement); } - return create(session, getQuery(statement), statement instanceof BoundStatement); + return create(session, singleton(getQuery(statement)), statement instanceof BoundStatement, null, null); } private static CassandraRequest create(Session session, BatchStatement batchStatement) { From bb978faa1266fee52ffae5bfa9e47803a8fab839 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Thu, 11 Jun 2026 10:36:38 -0700 Subject: [PATCH 04/10] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../instrumentation/api/incubator/semconv/db/MultiQuery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java index 29479d915f1b..277fbd84f2d9 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java @@ -56,7 +56,7 @@ static class Builder { private final Set uniqueQueryTexts = new LinkedHashSet<>(); private final UniqueValue uniqueQuerySummary = new UniqueValue(); - void add(SqlQuery analyzedQuery, @Nullable String queryText) { + void add(SqlQuery analyzedQuery, String queryText) { uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName()); uniqueQueryTexts.add(queryText); uniqueQuerySummary.set(analyzedQuery.getQuerySummary()); From 49e75eb6e3efa0e81d90f0e4ab3d0c6fecde85d0 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Thu, 11 Jun 2026 10:58:19 -0700 Subject: [PATCH 05/10] Address review comment from Copilot: align batch network type semconv --- .../instrumentation/cassandra/v3_0/CassandraClientTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java index 1f9b006b923d..5f21aeed6f62 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java @@ -325,7 +325,7 @@ void batchStatementWithSameQuery() { .hasKind(SpanKind.CLIENT) .hasNoParent() .hasAttributesSatisfyingExactly( - equalTo(NETWORK_TYPE, "ipv4"), + equalTo(NETWORK_TYPE, emitStableDatabaseSemconv() ? null : "ipv4"), equalTo(SERVER_ADDRESS, cassandraHost), equalTo(SERVER_PORT, cassandraPort), equalTo(NETWORK_PEER_ADDRESS, cassandraIp), From 91d3530bed478b12369973abd5333c6c2523fd6b Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Thu, 11 Jun 2026 10:59:17 -0700 Subject: [PATCH 06/10] Address review comment from Copilot: align mixed batch network type semconv --- .../api/incubator/semconv/db/MultiQuery.java | 2 +- .../db/SqlClientAttributesExtractorTest.java | 6 +- .../cassandra/v3_0/CassandraRequest.java | 45 +++++------ .../cassandra/v3_0/CassandraClientTest.java | 8 +- .../cassandra/v4_0/CassandraRequest.java | 62 +++++++++------ .../cassandra/v4_4/CassandraRequest.java | 69 ++++++++++------ .../common/v4_0/AbstractCassandraTest.java | 79 +++++++++++++++++-- 7 files changed, 185 insertions(+), 86 deletions(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java index 277fbd84f2d9..29479d915f1b 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java @@ -56,7 +56,7 @@ static class Builder { private final Set uniqueQueryTexts = new LinkedHashSet<>(); private final UniqueValue uniqueQuerySummary = new UniqueValue(); - void add(SqlQuery analyzedQuery, String queryText) { + void add(SqlQuery analyzedQuery, @Nullable String queryText) { uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName()); uniqueQueryTexts.add(queryText); uniqueQuerySummary.set(analyzedQuery.getQuerySummary()); diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java index 8d37a5328e18..20723c66bc49 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java @@ -369,7 +369,7 @@ void shouldExtractMixedParameterizedMultiQueryBatchAttributes() { request.put("db.namespace", "potatoes"); request.put( "db.query.texts", - asList("INSERT INTO potato VALUES(?)", "UPDATE potato SET name='bob' WHERE id=1")); + asList("INSERT INTO potato VALUES('alice', ?)", "UPDATE potato SET name='bob' WHERE id=1")); request.put("db.query.parameterized", asList(true, false)); request.put(DB_OPERATION_BATCH_SIZE.getKey(), 2L); @@ -393,7 +393,7 @@ void shouldExtractMixedParameterizedMultiQueryBatchAttributes() { entry(DB_NAMESPACE, "potatoes"), entry( DB_QUERY_TEXT, - "INSERT INTO potato VALUES(?); UPDATE potato SET name=? WHERE id=?"), + "INSERT INTO potato VALUES('alice', ?); UPDATE potato SET name=? WHERE id=?"), entry(DB_QUERY_SUMMARY, "BATCH"), entry(DB_OPERATION_BATCH_SIZE, 2L)); } else if (emitOldDatabaseSemconv()) { @@ -404,7 +404,7 @@ void shouldExtractMixedParameterizedMultiQueryBatchAttributes() { entry(DB_NAMESPACE, "potatoes"), entry( DB_QUERY_TEXT, - "INSERT INTO potato VALUES(?); UPDATE potato SET name=? WHERE id=?"), + "INSERT INTO potato VALUES('alice', ?); UPDATE potato SET name=? WHERE id=?"), entry(DB_QUERY_SUMMARY, "BATCH"), entry(DB_OPERATION_BATCH_SIZE, 2L)); } diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java index b65012b17ee1..90b21bd95ad5 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java @@ -39,49 +39,50 @@ static CassandraRequest create(Session session, Statement statement) { private static CassandraRequest create(Session session, BatchStatement batchStatement) { List queryTexts = new ArrayList<>(); - List parameterizedQueries = null; - boolean allParameterized = true; + List mixedParameterizedQueries = null; + boolean allQueriesParameterized = true; Boolean firstParameterizedQuery = null; int queryIndex = 0; for (Statement batchEntry : batchStatement.getStatements()) { queryTexts.add(getQuery(batchEntry)); boolean parameterizedQuery = batchEntry instanceof BoundStatement; if (!parameterizedQuery) { - allParameterized = false; + allQueriesParameterized = false; } if (firstParameterizedQuery == null) { firstParameterizedQuery = parameterizedQuery; - } else if (parameterizedQuery != firstParameterizedQuery && parameterizedQueries == null) { - parameterizedQueries = new ArrayList<>(batchStatement.size()); + } else if (parameterizedQuery != firstParameterizedQuery + && mixedParameterizedQueries == null) { + mixedParameterizedQueries = new ArrayList<>(batchStatement.size()); for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) { - parameterizedQueries.add(firstParameterizedQuery); + mixedParameterizedQueries.add(firstParameterizedQuery); } } - if (parameterizedQueries != null) { - parameterizedQueries.add(parameterizedQuery); + if (mixedParameterizedQueries != null) { + mixedParameterizedQueries.add(parameterizedQuery); } queryIndex++; } - boolean parameterizedQuery = allParameterized; - if (parameterizedQueries == null && firstParameterizedQuery != null) { - parameterizedQuery = firstParameterizedQuery; + boolean allQueriesParameterizedResult = allQueriesParameterized; + if (mixedParameterizedQueries == null && firstParameterizedQuery != null) { + allQueriesParameterizedResult = firstParameterizedQuery; } return create( session, queryTexts, - parameterizedQuery, - parameterizedQueries, + allQueriesParameterizedResult, + mixedParameterizedQueries, Long.valueOf(batchStatement.size())); } private static CassandraRequest create( Session session, Collection queryTexts, - boolean parameterizedQuery, - @Nullable List parameterizedQueries, + boolean allQueriesParameterized, + @Nullable List mixedParameterizedQueries, @Nullable Long batchSize) { return new AutoValue_CassandraRequest( - session, queryTexts, parameterizedQuery, parameterizedQueries, batchSize); + session, queryTexts, allQueriesParameterized, mixedParameterizedQueries, batchSize); } private static String getQuery(Statement statement) { @@ -99,16 +100,16 @@ private static String getQuery(Statement statement) { abstract Collection getQueryTexts(); - abstract boolean parameterizedQuery(); + abstract boolean allQueriesParameterized(); @Nullable - abstract List getParameterizedQueries(); + abstract List mixedParameterizedQueries(); boolean isParameterizedQuery(int queryIndex) { - List parameterizedQueries = getParameterizedQueries(); - return parameterizedQueries == null - ? parameterizedQuery() - : parameterizedQueries.get(queryIndex); + List mixedParameterizedQueries = mixedParameterizedQueries(); + return mixedParameterizedQueries == null + ? allQueriesParameterized() + : mixedParameterizedQueries.get(queryIndex); } @Nullable diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java index 5f21aeed6f62..72faffc7f825 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java @@ -355,12 +355,12 @@ void batchStatementWithDifferentQueries() { "CREATE KEYSPACE batch_mixed_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement insertStatement = - session.prepare("INSERT INTO batch_mixed_test.users (name, age) values (?, ?)"); + session.prepare("INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?)"); testing.clearData(); BatchStatement batchStatement = new BatchStatement() - .add(insertStatement.bind("alice", 1)) + .add(insertStatement.bind(1)) .add( new SimpleStatement( "UPDATE batch_mixed_test.users SET age = 2 WHERE name = 'alice'")); @@ -374,7 +374,7 @@ void batchStatementWithDifferentQueries() { .hasKind(SpanKind.CLIENT) .hasNoParent() .hasAttributesSatisfyingExactly( - equalTo(NETWORK_TYPE, "ipv4"), + equalTo(NETWORK_TYPE, emitStableDatabaseSemconv() ? null : "ipv4"), equalTo(SERVER_ADDRESS, cassandraHost), equalTo(SERVER_PORT, cassandraPort), equalTo(NETWORK_PEER_ADDRESS, cassandraIp), @@ -383,7 +383,7 @@ void batchStatementWithDifferentQueries() { equalTo( maybeStable(DB_STATEMENT), emitStableDatabaseSemconv() - ? "INSERT INTO batch_mixed_test.users (name, age) values (?, ?); UPDATE batch_mixed_test.users SET age = ? WHERE name = ?" + ? "INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?); UPDATE batch_mixed_test.users SET age = ? WHERE name = ?" : null), equalTo( DB_OPERATION_BATCH_SIZE, emitStableDatabaseSemconv() ? 2L : null), diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java index f843506a6bad..d0920565c80b 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java @@ -30,55 +30,55 @@ static CassandraRequest create(Session session, Statement statement) { if (statement instanceof BatchStatement) { return create(session, (BatchStatement) statement); } - return create( - session, singleton(getQuery(statement)), statement instanceof BoundStatement, null, null); + return create(session, singleton(getQuery(statement)), hasQueryValues(statement), null, null); } private static CassandraRequest create(Session session, BatchStatement batchStatement) { List queryTexts = new ArrayList<>(); - List parameterizedQueries = null; - boolean allParameterized = true; + List mixedParameterizedQueries = null; + boolean allQueriesParameterized = true; Boolean firstParameterizedQuery = null; int queryIndex = 0; for (BatchableStatement batchEntry : batchStatement) { queryTexts.add(getQuery(batchEntry)); - boolean parameterizedQuery = batchEntry instanceof BoundStatement; + boolean parameterizedQuery = hasQueryValues(batchEntry); if (!parameterizedQuery) { - allParameterized = false; + allQueriesParameterized = false; } if (firstParameterizedQuery == null) { firstParameterizedQuery = parameterizedQuery; - } else if (parameterizedQuery != firstParameterizedQuery && parameterizedQueries == null) { - parameterizedQueries = new ArrayList<>(batchStatement.size()); + } else if (parameterizedQuery != firstParameterizedQuery + && mixedParameterizedQueries == null) { + mixedParameterizedQueries = new ArrayList<>(batchStatement.size()); for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) { - parameterizedQueries.add(firstParameterizedQuery); + mixedParameterizedQueries.add(firstParameterizedQuery); } } - if (parameterizedQueries != null) { - parameterizedQueries.add(parameterizedQuery); + if (mixedParameterizedQueries != null) { + mixedParameterizedQueries.add(parameterizedQuery); } queryIndex++; } - boolean parameterizedQuery = allParameterized; - if (parameterizedQueries == null && firstParameterizedQuery != null) { - parameterizedQuery = firstParameterizedQuery; + boolean allQueriesParameterizedResult = allQueriesParameterized; + if (mixedParameterizedQueries == null && firstParameterizedQuery != null) { + allQueriesParameterizedResult = firstParameterizedQuery; } return create( session, queryTexts, - parameterizedQuery, - parameterizedQueries, + allQueriesParameterizedResult, + mixedParameterizedQueries, Long.valueOf(batchStatement.size())); } private static CassandraRequest create( Session session, Collection queryTexts, - boolean parameterizedQuery, - @Nullable List parameterizedQueries, + boolean allQueriesParameterized, + @Nullable List mixedParameterizedQueries, @Nullable Long batchSize) { return new AutoValue_CassandraRequest( - session, queryTexts, parameterizedQuery, parameterizedQueries, batchSize); + session, queryTexts, allQueriesParameterized, mixedParameterizedQueries, batchSize); } private static String getQuery(Statement statement) { @@ -92,20 +92,32 @@ private static String getQuery(Statement statement) { return query == null ? "" : query; } + private static boolean hasQueryValues(Statement statement) { + if (statement instanceof BoundStatement) { + return true; + } + if (statement instanceof SimpleStatement) { + SimpleStatement simpleStatement = (SimpleStatement) statement; + return !simpleStatement.getPositionalValues().isEmpty() + || !simpleStatement.getNamedValues().isEmpty(); + } + return false; + } + abstract Session getSession(); abstract Collection getQueryTexts(); - abstract boolean parameterizedQuery(); + abstract boolean allQueriesParameterized(); @Nullable - abstract List getParameterizedQueries(); + abstract List mixedParameterizedQueries(); boolean isParameterizedQuery(int queryIndex) { - List parameterizedQueries = getParameterizedQueries(); - return parameterizedQueries == null - ? parameterizedQuery() - : parameterizedQueries.get(queryIndex); + List mixedParameterizedQueries = mixedParameterizedQueries(); + return mixedParameterizedQueries == null + ? allQueriesParameterized() + : mixedParameterizedQueries.get(queryIndex); } @Nullable diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java index e4f0c0b28d77..629511b1aa70 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java @@ -36,54 +36,55 @@ static CassandraRequest create(Session session, Statement statement) { if (statement instanceof BatchStatement) { return create(session, (BatchStatement) statement); } - return create(session, singleton(getQuery(statement)), statement instanceof BoundStatement, null, null); + return create(session, singleton(getQuery(statement)), hasQueryValues(statement), null, null); } private static CassandraRequest create(Session session, BatchStatement batchStatement) { List queryTexts = new ArrayList<>(); - List parameterizedQueries = null; - boolean allParameterized = true; + List mixedParameterizedQueries = null; + boolean allQueriesParameterized = true; Boolean firstParameterizedQuery = null; int queryIndex = 0; for (BatchableStatement batchEntry : batchStatement) { queryTexts.add(getQuery(batchEntry)); - boolean parameterizedQuery = batchEntry instanceof BoundStatement; + boolean parameterizedQuery = hasQueryValues(batchEntry); if (!parameterizedQuery) { - allParameterized = false; + allQueriesParameterized = false; } if (firstParameterizedQuery == null) { firstParameterizedQuery = parameterizedQuery; - } else if (parameterizedQuery != firstParameterizedQuery && parameterizedQueries == null) { - parameterizedQueries = new ArrayList<>(batchStatement.size()); + } else if (parameterizedQuery != firstParameterizedQuery + && mixedParameterizedQueries == null) { + mixedParameterizedQueries = new ArrayList<>(batchStatement.size()); for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) { - parameterizedQueries.add(firstParameterizedQuery); + mixedParameterizedQueries.add(firstParameterizedQuery); } } - if (parameterizedQueries != null) { - parameterizedQueries.add(parameterizedQuery); + if (mixedParameterizedQueries != null) { + mixedParameterizedQueries.add(parameterizedQuery); } queryIndex++; } - boolean parameterizedQuery = allParameterized; - if (parameterizedQueries == null && firstParameterizedQuery != null) { - parameterizedQuery = firstParameterizedQuery; + boolean allQueriesParameterizedResult = allQueriesParameterized; + if (mixedParameterizedQueries == null && firstParameterizedQuery != null) { + allQueriesParameterizedResult = firstParameterizedQuery; } return create( session, queryTexts, - parameterizedQuery, - parameterizedQueries, + allQueriesParameterizedResult, + mixedParameterizedQueries, Long.valueOf(batchStatement.size())); } private static CassandraRequest create( Session session, Collection queryTexts, - boolean parameterizedQuery, - @Nullable List parameterizedQueries, + boolean allQueriesParameterized, + @Nullable List mixedParameterizedQueries, @Nullable Long batchSize) { return new AutoValue_CassandraRequest( - session, queryTexts, parameterizedQuery, parameterizedQueries, batchSize); + session, queryTexts, allQueriesParameterized, mixedParameterizedQueries, batchSize); } private static String getQuery(Statement statement) { @@ -97,6 +98,18 @@ private static String getQuery(Statement statement) { return query == null ? "" : query; } + private static boolean hasQueryValues(Statement statement) { + if (statement instanceof BoundStatement) { + return true; + } + if (statement instanceof SimpleStatement) { + SimpleStatement simpleStatement = (SimpleStatement) statement; + return !simpleStatement.getPositionalValues().isEmpty() + || !simpleStatement.getNamedValues().isEmpty(); + } + return false; + } + /** * @deprecated this method will be reduced to package-private visibility */ @@ -112,13 +125,17 @@ private static String getQuery(Statement statement) { */ @Deprecated public String getQueryText() { - return getQueryTexts().isEmpty() ? "" : getQueryTexts().iterator().next(); + if (getBatchSize() != null) { + // Preserve previous public API behavior: BatchStatement query text was not captured. + return ""; + } + return getQueryTexts().iterator().next(); } - abstract boolean parameterizedQuery(); + abstract boolean allQueriesParameterized(); @Nullable - abstract List getParameterizedQueries(); + abstract List mixedParameterizedQueries(); /** * Returns whether all queries in this request are parameterized. @@ -127,14 +144,14 @@ public String getQueryText() { */ @Deprecated public boolean isParameterizedQuery() { - return parameterizedQuery(); + return allQueriesParameterized(); } boolean isParameterizedQuery(int queryIndex) { - List parameterizedQueries = getParameterizedQueries(); - return parameterizedQueries == null - ? parameterizedQuery() - : parameterizedQueries.get(queryIndex); + List mixedParameterizedQueries = mixedParameterizedQueries(); + return mixedParameterizedQueries == null + ? allQueriesParameterized() + : mixedParameterizedQueries.get(queryIndex); } @Nullable diff --git a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java index c2a06472d444..40ede18237dc 100644 --- a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java +++ b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java @@ -127,6 +127,67 @@ void testMetrics() { SERVER_PORT); } + @Test + void simpleStatementWithValues() { + CqlSession session = getSession(null); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS simple_values_test"); + session.execute( + "CREATE KEYSPACE simple_values_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE simple_values_test.users ( name text PRIMARY KEY, age int )"); + testing().clearData(); + + session.execute( + SimpleStatement.newInstance( + "INSERT INTO simple_values_test.users (name, age) values ('alice', ?)", 1)); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("INSERT simple_values_test.users") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + satisfies( + NETWORK_TYPE, + emitStableDatabaseSemconv() + ? val -> val.isNull() + : val -> val.isIn("ipv4", "ipv6")), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO simple_values_test.users (name, age) values ('alice', ?)" + : "INSERT INTO simple_values_test.users (name, age) values (?, ?)"), + equalTo( + DB_QUERY_SUMMARY, + emitStableDatabaseSemconv() + ? "INSERT simple_values_test.users" + : null), + equalTo(maybeStable(DB_OPERATION), "INSERT"), + equalTo(maybeStable(DB_CASSANDRA_CONSISTENCY_LEVEL), "LOCAL_ONE"), + equalTo(maybeStable(DB_CASSANDRA_COORDINATOR_DC), "datacenter1"), + satisfies( + maybeStable(DB_CASSANDRA_COORDINATOR_ID), + val -> val.isInstanceOf(String.class)), + satisfies( + maybeStable(DB_CASSANDRA_IDEMPOTENCE), + val -> val.isInstanceOf(Boolean.class)), + equalTo(maybeStable(DB_CASSANDRA_PAGE_SIZE), 5000), + equalTo( + maybeStable(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT), 0), + equalTo( + maybeStable(DB_CASSANDRA_TABLE), + "simple_values_test.users")))); + } + @Test void batchStatementWithSameQuery() { CqlSession session = getSession(null); @@ -159,7 +220,11 @@ void batchStatementWithSameQuery() { .hasKind(SpanKind.CLIENT) .hasNoParent() .hasAttributesSatisfyingExactly( - satisfies(NETWORK_TYPE, val -> val.isIn("ipv4", "ipv6")), + satisfies( + NETWORK_TYPE, + emitStableDatabaseSemconv() + ? val -> val.isNull() + : val -> val.isIn("ipv4", "ipv6")), equalTo(SERVER_ADDRESS, cassandraHost), equalTo(SERVER_PORT, cassandraPort), equalTo(NETWORK_PEER_ADDRESS, cassandraIp), @@ -201,13 +266,13 @@ void batchStatementWithDifferentQueries() { "CREATE KEYSPACE batch_mixed_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement insertStatement = - session.prepare("INSERT INTO batch_mixed_test.users (name, age) values (?, ?)"); + session.prepare("INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?)"); testing().clearData(); BatchStatement batchStatement = BatchStatement.newInstance( DefaultBatchType.LOGGED, - insertStatement.bind("alice", 1), + insertStatement.bind(1), SimpleStatement.newInstance( "UPDATE batch_mixed_test.users SET age = 2 WHERE name = 'alice'")); session.execute(batchStatement); @@ -221,7 +286,11 @@ void batchStatementWithDifferentQueries() { .hasKind(SpanKind.CLIENT) .hasNoParent() .hasAttributesSatisfyingExactly( - satisfies(NETWORK_TYPE, val -> val.isIn("ipv4", "ipv6")), + satisfies( + NETWORK_TYPE, + emitStableDatabaseSemconv() + ? val -> val.isNull() + : val -> val.isIn("ipv4", "ipv6")), equalTo(SERVER_ADDRESS, cassandraHost), equalTo(SERVER_PORT, cassandraPort), equalTo(NETWORK_PEER_ADDRESS, cassandraIp), @@ -230,7 +299,7 @@ void batchStatementWithDifferentQueries() { equalTo( maybeStable(DB_STATEMENT), emitStableDatabaseSemconv() - ? "INSERT INTO batch_mixed_test.users (name, age) values (?, ?); UPDATE batch_mixed_test.users SET age = ? WHERE name = ?" + ? "INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?); UPDATE batch_mixed_test.users SET age = ? WHERE name = ?" : null), equalTo( DB_OPERATION_BATCH_SIZE, From a4a583b19861e067cae058763061175fcf3f6bf3 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Thu, 11 Jun 2026 21:40:00 -0700 Subject: [PATCH 07/10] spotless --- .../cassandra/common/v4_0/AbstractCassandraTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java index 40ede18237dc..b7b26ab44cc0 100644 --- a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java +++ b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java @@ -181,11 +181,9 @@ void simpleStatementWithValues() { maybeStable(DB_CASSANDRA_IDEMPOTENCE), val -> val.isInstanceOf(Boolean.class)), equalTo(maybeStable(DB_CASSANDRA_PAGE_SIZE), 5000), + equalTo(maybeStable(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT), 0), equalTo( - maybeStable(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT), 0), - equalTo( - maybeStable(DB_CASSANDRA_TABLE), - "simple_values_test.users")))); + maybeStable(DB_CASSANDRA_TABLE), "simple_values_test.users")))); } @Test From 3e74a132e80403a0107e4216ff2cbccee1bd11a6 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Fri, 12 Jun 2026 07:44:01 -0700 Subject: [PATCH 08/10] Address Cassandra batch trace wait review comment --- .../instrumentation/cassandra/v3_0/CassandraClientTest.java | 2 ++ .../cassandra/common/v4_0/AbstractCassandraTest.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java index 72faffc7f825..3ace81a4adc9 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java @@ -306,6 +306,7 @@ void batchStatementWithSameQuery() { session.execute("CREATE TABLE batch_same_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement preparedStatement = session.prepare("INSERT INTO batch_same_test.users (name, age) values (?, ?)"); + testing.waitForTraces(4); testing.clearData(); BatchStatement batchStatement = @@ -356,6 +357,7 @@ void batchStatementWithDifferentQueries() { session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement insertStatement = session.prepare("INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?)"); + testing.waitForTraces(4); testing.clearData(); BatchStatement batchStatement = diff --git a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java index b7b26ab44cc0..3da7f2e41e36 100644 --- a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java +++ b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java @@ -197,6 +197,7 @@ void batchStatementWithSameQuery() { session.execute("CREATE TABLE batch_same_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement preparedStatement = session.prepare("INSERT INTO batch_same_test.users (name, age) values (?, ?)"); + testing().waitForTraces(4); testing().clearData(); BatchStatement batchStatement = @@ -265,6 +266,7 @@ void batchStatementWithDifferentQueries() { session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement insertStatement = session.prepare("INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?)"); + testing().waitForTraces(4); testing().clearData(); BatchStatement batchStatement = From 9dba76e62ebc3308933cd2577915b8d741a41662 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Fri, 12 Jun 2026 08:45:47 -0700 Subject: [PATCH 09/10] Stabilize Cassandra batch and Pulsar multi-topic tests Cassandra batch setup only emits traces for the three executed schema statements; preparing the batch statement does not create a trace, so the tests timed out while draining four traces before the real assertion. Pulsar latest dependencies can emit extra receive-only spans for multi-topic consumption, and one-message batch receive spans may report batch count instead of an individual message id. The test now asserts the required publish, receive, and process spans directly instead of relying on an exact trace count. Validation: .\gradlew.bat :instrumentation:cassandra:cassandra-3.0:javaagent:test --tests io.opentelemetry.javaagent.instrumentation.cassandra.v3_0.CassandraClientTest.batchStatementWithSameQuery --tests io.opentelemetry.javaagent.instrumentation.cassandra.v3_0.CassandraClientTest.batchStatementWithDifferentQueries :instrumentation:cassandra:cassandra-4.0:javaagent:test --tests io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraTest.batchStatementWithSameQuery --tests io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraTest.batchStatementWithDifferentQueries :instrumentation:cassandra:cassandra-4.4:javaagent:test --tests io.opentelemetry.javaagent.instrumentation.cassandra.v4_4.CassandraTest.batchStatementWithSameQuery --tests io.opentelemetry.javaagent.instrumentation.cassandra.v4_4.CassandraTest.batchStatementWithDifferentQueries :instrumentation:cassandra:cassandra-4.4:library:test --tests io.opentelemetry.instrumentation.cassandra.v4_4.CassandraTest.batchStatementWithSameQuery --tests io.opentelemetry.instrumentation.cassandra.v4_4.CassandraTest.batchStatementWithDifferentQueries :instrumentation:pulsar:pulsar-2.8:javaagent:test --tests io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.PulsarClientTest.testConsumeMultiTopics .\gradlew.bat :instrumentation:pulsar:pulsar-2.8:javaagent:test -PtestLatestDeps=true --tests io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.PulsarClientTest.testConsumeMultiTopics .\gradlew.bat :instrumentation:cassandra:cassandra-3.0:javaagent:spotlessCheck :instrumentation:cassandra:cassandra-common-4.0:testing:spotlessCheck :instrumentation:pulsar:pulsar-2.8:javaagent:spotlessCheck --- .../cassandra/v3_0/CassandraClientTest.java | 4 +- .../common/v4_0/AbstractCassandraTest.java | 4 +- .../pulsar/v2_8/PulsarClientTest.java | 136 ++++++++++-------- 3 files changed, 81 insertions(+), 63 deletions(-) diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java index 3ace81a4adc9..17925b675ba1 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java @@ -306,7 +306,7 @@ void batchStatementWithSameQuery() { session.execute("CREATE TABLE batch_same_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement preparedStatement = session.prepare("INSERT INTO batch_same_test.users (name, age) values (?, ?)"); - testing.waitForTraces(4); + testing.waitForTraces(3); testing.clearData(); BatchStatement batchStatement = @@ -357,7 +357,7 @@ void batchStatementWithDifferentQueries() { session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement insertStatement = session.prepare("INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?)"); - testing.waitForTraces(4); + testing.waitForTraces(3); testing.clearData(); BatchStatement batchStatement = diff --git a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java index 3da7f2e41e36..9964fb8e8530 100644 --- a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java +++ b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java @@ -197,7 +197,7 @@ void batchStatementWithSameQuery() { session.execute("CREATE TABLE batch_same_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement preparedStatement = session.prepare("INSERT INTO batch_same_test.users (name, age) values (?, ?)"); - testing().waitForTraces(4); + testing().waitForTraces(3); testing().clearData(); BatchStatement batchStatement = @@ -266,7 +266,7 @@ void batchStatementWithDifferentQueries() { session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); PreparedStatement insertStatement = session.prepare("INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?)"); - testing().waitForTraces(4); + testing().waitForTraces(3); testing().clearData(); BatchStatement batchStatement = diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 2c34ede8c5ca..fe00b3ada35c 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -6,20 +6,25 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; -import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; +import java.time.Duration; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -588,66 +593,79 @@ void testConsumeMultiTopics() throws Exception { latch.await(1, MINUTES); - AtomicReference producerSpan = new AtomicReference<>(); - AtomicReference producerSpan2 = new AtomicReference<>(); - testing.waitAndAssertSortedTraces( - orderByRootSpanName("parent1", topic1 + " receive", "parent2", topic2 + " receive"), - trace -> { - trace.hasSpansSatisfyingExactly( - span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), - span -> - span.hasName(topic1 + " publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - sendAttributes(topic1, msgId1.toString(), false))); + await() + .atMost(Duration.ofSeconds(20)) + .untilAsserted( + () -> { + List spans = testing.spans(); + SpanData topic1Producer = findSpan(spans, topic1 + " publish", SpanKind.PRODUCER); + SpanData topic2Producer = findSpan(spans, topic2 + " publish", SpanKind.PRODUCER); + + assertThat(topic1Producer) + .hasAttributesSatisfyingExactly(sendAttributes(topic1, msgId1.toString(), false)); + assertThat(topic2Producer) + .hasAttributesSatisfyingExactly(sendAttributes(topic2, msgId2.toString(), false)); + + assertThat(spans) + .filteredOn(span -> span.getName().equals(topic1 + " receive")) + .anySatisfy( + span -> assertReceiveSpan(span, topic1Producer, topic1, msgId1.toString())); + assertThat(spans) + .filteredOn(span -> span.getName().equals(topic2 + " receive")) + .anySatisfy( + span -> assertReceiveSpan(span, topic2Producer, topic2, msgId2.toString())); + + assertThat(spans) + .filteredOn(span -> span.getName().equals(topic1 + " process")) + .singleElement() + .satisfies( + span -> + assertThat(span) + .hasKind(SpanKind.CONSUMER) + .hasLinks(LinkData.create(topic1Producer.getSpanContext())) + .hasAttributesSatisfyingExactly( + processAttributes(topic1, msgId1.toString(), false))); + assertThat(spans) + .filteredOn(span -> span.getName().equals(topic2 + " process")) + .singleElement() + .satisfies( + span -> + assertThat(span) + .hasKind(SpanKind.CONSUMER) + .hasLinks(LinkData.create(topic2Producer.getSpanContext())) + .hasAttributesSatisfyingExactly( + processAttributes(topic2, msgId2.toString(), false))); + }); + } - producerSpan.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName(topic1 + " receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) - .hasAttributesSatisfyingExactly( - receiveAttributes(topic1, msgId1.toString(), false)), - span -> - span.hasName(topic1 + " process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) - .hasAttributesSatisfyingExactly( - processAttributes(topic1, msgId1.toString(), false))), - trace -> { - trace.hasSpansSatisfyingExactly( - span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(), - span -> - span.hasName(topic2 + " publish") - .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - sendAttributes(topic2, msgId2.toString(), false))); + private static SpanData findSpan(List spans, String name, SpanKind kind) { + return spans.stream() + .filter(span -> span.getName().equals(name) && span.getKind() == kind) + .findFirst() + .orElseThrow(() -> new AssertionError("Could not find span " + name)); + } - producerSpan2.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName(topic2 + " receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasLinks(LinkData.create(producerSpan2.get().getSpanContext())) - .hasAttributesSatisfyingExactly( - receiveAttributes(topic2, msgId2.toString(), false)), - span -> - span.hasName(topic2 + " process") - .hasKind(SpanKind.CONSUMER) - .hasParent(trace.getSpan(0)) - .hasLinks(LinkData.create(producerSpan2.get().getSpanContext())) - .hasAttributesSatisfyingExactly( - processAttributes(topic2, msgId2.toString(), false)))); + @SuppressWarnings("deprecation") // using deprecated semconv + private static void assertReceiveSpan( + SpanData span, SpanData producerSpan, String topic, String messageId) { + assertThat(span) + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks(LinkData.create(producerSpan.getSpanContext())); + + Attributes attributes = span.getAttributes(); + assertThat(attributes.get(MESSAGING_SYSTEM)).isEqualTo("pulsar"); + assertThat(attributes.get(MESSAGING_DESTINATION_NAME)).isEqualTo(topic); + assertThat(attributes.get(MESSAGING_OPERATION)).isEqualTo("receive"); + assertThat(attributes.get(SERVER_ADDRESS)).isEqualTo(brokerHost); + assertThat(attributes.get(SERVER_PORT)).isEqualTo(brokerPort); + assertThat( + attributes.get(MESSAGING_MESSAGE_ID) != null + || attributes.get(MESSAGING_BATCH_MESSAGE_COUNT) != null) + .isTrue(); + if (attributes.get(MESSAGING_MESSAGE_ID) != null) { + assertThat(attributes.get(MESSAGING_MESSAGE_ID)).isEqualTo(messageId); + } } @SuppressWarnings("deprecation") // using deprecated semconv From 4dedb7d930a4eb3b60788eec2f674dd3e51e337f Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Fri, 12 Jun 2026 08:53:33 -0700 Subject: [PATCH 10/10] revert unrelated --- .../pulsar/v2_8/PulsarClientTest.java | 136 ++++++++---------- 1 file changed, 59 insertions(+), 77 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index fe00b3ada35c..2c34ede8c5ca 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -6,25 +6,20 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; -import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; -import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.awaitility.Awaitility.await; -import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; -import java.time.Duration; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -593,79 +588,66 @@ void testConsumeMultiTopics() throws Exception { latch.await(1, MINUTES); - await() - .atMost(Duration.ofSeconds(20)) - .untilAsserted( - () -> { - List spans = testing.spans(); - SpanData topic1Producer = findSpan(spans, topic1 + " publish", SpanKind.PRODUCER); - SpanData topic2Producer = findSpan(spans, topic2 + " publish", SpanKind.PRODUCER); - - assertThat(topic1Producer) - .hasAttributesSatisfyingExactly(sendAttributes(topic1, msgId1.toString(), false)); - assertThat(topic2Producer) - .hasAttributesSatisfyingExactly(sendAttributes(topic2, msgId2.toString(), false)); - - assertThat(spans) - .filteredOn(span -> span.getName().equals(topic1 + " receive")) - .anySatisfy( - span -> assertReceiveSpan(span, topic1Producer, topic1, msgId1.toString())); - assertThat(spans) - .filteredOn(span -> span.getName().equals(topic2 + " receive")) - .anySatisfy( - span -> assertReceiveSpan(span, topic2Producer, topic2, msgId2.toString())); - - assertThat(spans) - .filteredOn(span -> span.getName().equals(topic1 + " process")) - .singleElement() - .satisfies( - span -> - assertThat(span) - .hasKind(SpanKind.CONSUMER) - .hasLinks(LinkData.create(topic1Producer.getSpanContext())) - .hasAttributesSatisfyingExactly( - processAttributes(topic1, msgId1.toString(), false))); - assertThat(spans) - .filteredOn(span -> span.getName().equals(topic2 + " process")) - .singleElement() - .satisfies( - span -> - assertThat(span) - .hasKind(SpanKind.CONSUMER) - .hasLinks(LinkData.create(topic2Producer.getSpanContext())) - .hasAttributesSatisfyingExactly( - processAttributes(topic2, msgId2.toString(), false))); - }); - } + AtomicReference producerSpan = new AtomicReference<>(); + AtomicReference producerSpan2 = new AtomicReference<>(); + testing.waitAndAssertSortedTraces( + orderByRootSpanName("parent1", topic1 + " receive", "parent2", topic2 + " receive"), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic1 + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic1, msgId1.toString(), false))); - private static SpanData findSpan(List spans, String name, SpanKind kind) { - return spans.stream() - .filter(span -> span.getName().equals(name) && span.getKind() == kind) - .findFirst() - .orElseThrow(() -> new AssertionError("Could not find span " + name)); - } + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(topic1 + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic1, msgId1.toString(), false)), + span -> + span.hasName(topic1 + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + processAttributes(topic1, msgId1.toString(), false))), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic2 + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic2, msgId2.toString(), false))); - @SuppressWarnings("deprecation") // using deprecated semconv - private static void assertReceiveSpan( - SpanData span, SpanData producerSpan, String topic, String messageId) { - assertThat(span) - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasLinks(LinkData.create(producerSpan.getSpanContext())); - - Attributes attributes = span.getAttributes(); - assertThat(attributes.get(MESSAGING_SYSTEM)).isEqualTo("pulsar"); - assertThat(attributes.get(MESSAGING_DESTINATION_NAME)).isEqualTo(topic); - assertThat(attributes.get(MESSAGING_OPERATION)).isEqualTo("receive"); - assertThat(attributes.get(SERVER_ADDRESS)).isEqualTo(brokerHost); - assertThat(attributes.get(SERVER_PORT)).isEqualTo(brokerPort); - assertThat( - attributes.get(MESSAGING_MESSAGE_ID) != null - || attributes.get(MESSAGING_BATCH_MESSAGE_COUNT) != null) - .isTrue(); - if (attributes.get(MESSAGING_MESSAGE_ID) != null) { - assertThat(attributes.get(MESSAGING_MESSAGE_ID)).isEqualTo(messageId); - } + producerSpan2.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(topic2 + " receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks(LinkData.create(producerSpan2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic2, msgId2.toString(), false)), + span -> + span.hasName(topic2 + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerSpan2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + processAttributes(topic2, msgId2.toString(), false)))); } @SuppressWarnings("deprecation") // using deprecated semconv