diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java index 9dc38ca35aeb..28172e8cb82e 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java @@ -227,7 +227,7 @@ public String extract(REQUEST request) { getter, request, batch ? "BATCH" : null, null, analyzedQuery.getStoredProcedureName()); } - MultiQuery multiQuery = MultiQuery.analyzeWithSummary(rawQueryTexts, dialect, false); + MultiQuery multiQuery = MultiQuery.analyzeWithSummary(rawQueryTexts, dialect); String querySummary = multiQuery.getQuerySummary(); if (querySummary != null) { return querySummary; diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java index a0ef7636e331..29479d915f1b 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java @@ -23,23 +23,18 @@ private MultiQuery( this.querySummary = querySummary; } - static MultiQuery analyzeWithSummary( - Collection rawQueryTexts, SqlDialect dialect, boolean querySanitizationEnabled) { - UniqueValue uniqueStoredProcedureName = new UniqueValue(); - Set uniqueQueryTexts = new LinkedHashSet<>(); - UniqueValue uniqueQuerySummary = new UniqueValue(); + static MultiQuery analyzeWithSummary(Collection rawQueryTexts, SqlDialect dialect) { + Builder builder = builder(); for (String rawQueryText : rawQueryTexts) { SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect); - uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName()); - uniqueQueryTexts.add(querySanitizationEnabled ? analyzedQuery.getQueryText() : rawQueryText); - uniqueQuerySummary.set(analyzedQuery.getQuerySummary()); + builder.add(analyzedQuery, rawQueryText); } - String querySummary = uniqueQuerySummary.getValue(); - return new MultiQuery( - uniqueStoredProcedureName.getValue(), - uniqueQueryTexts, - querySummary == null ? "BATCH" : "BATCH " + querySummary); + return builder.build(); + } + + static Builder builder() { + return new Builder(); } @Nullable @@ -56,6 +51,26 @@ public Set getQueryTexts() { return queryTexts; } + static class Builder { + private final UniqueValue uniqueStoredProcedureName = new UniqueValue(); + private final Set uniqueQueryTexts = new LinkedHashSet<>(); + private final UniqueValue uniqueQuerySummary = new UniqueValue(); + + void add(SqlQuery analyzedQuery, @Nullable String queryText) { + uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName()); + uniqueQueryTexts.add(queryText); + uniqueQuerySummary.set(analyzedQuery.getQuerySummary()); + } + + MultiQuery build() { + String querySummary = uniqueQuerySummary.getValue(); + return new MultiQuery( + uniqueStoredProcedureName.getValue(), + uniqueQueryTexts, + querySummary == null ? "BATCH" : "BATCH " + querySummary); + } + } + private static class UniqueValue { @Nullable private String value; private boolean valid = true; diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractor.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractor.java index 856c5e6b3f61..874de62b85f5 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractor.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractor.java @@ -109,11 +109,11 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST if (isBatch) { attributes.put(DB_OPERATION_BATCH_SIZE, batchSize); } - boolean parameterizedQuery = getter.isParameterizedQuery(request); - boolean shouldSanitize = querySanitizationEnabled && !parameterizedQuery; if (rawQueryTexts.size() == 1) { String rawQueryText = rawQueryTexts.iterator().next(); SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect); + boolean shouldSanitize = + querySanitizationEnabled && !getter.isParameterizedQuery(request, 0); attributes.put(DB_QUERY_TEXT, shouldSanitize ? analyzedQuery.getQueryText() : rawQueryText); String querySummary = analyzedQuery.getQuerySummary(); attributes.put( @@ -125,9 +125,16 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST } attributes.put(DB_STORED_PROCEDURE_NAME, analyzedQuery.getStoredProcedureName()); } else if (rawQueryTexts.size() > 1) { - MultiQuery multiQuery = - MultiQuery.analyzeWithSummary( - getter.getRawQueryTexts(request), dialect, shouldSanitize); + MultiQuery.Builder builder = MultiQuery.builder(); + int queryIndex = 0; + for (String rawQueryText : rawQueryTexts) { + SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect); + boolean shouldSanitize = + querySanitizationEnabled && !getter.isParameterizedQuery(request, queryIndex); + builder.add(analyzedQuery, shouldSanitize ? analyzedQuery.getQueryText() : rawQueryText); + queryIndex++; + } + MultiQuery multiQuery = builder.build(); attributes.put(DB_QUERY_TEXT, join("; ", multiQuery.getQueryTexts())); attributes.put(DB_QUERY_SUMMARY, multiQuery.getQuerySummary()); attributes.put(DB_STORED_PROCEDURE_NAME, multiQuery.getStoredProcedureName()); diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesGetter.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesGetter.java index d3ee2b078192..c3b2ac869b47 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesGetter.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesGetter.java @@ -71,9 +71,25 @@ default String getDbQuerySummary(REQUEST request) { * query does not need to be sanitized. See sanitization * of db.query.text. + * + * @deprecated use {@link #isParameterizedQuery(Object, int)} instead */ - // TODO: make this required to implement + @Deprecated default boolean isParameterizedQuery(REQUEST request) { return false; } + + /** + * Returns whether the query at {@code queryIndex} in {@link #getRawQueryTexts(Object)} is + * parameterized. + * + *

The {@code queryIndex} is zero-based and follows the iteration order of {@link + * #getRawQueryTexts(Object)}. This supports batch operations where individual entries may have + * different parameterization. + */ + // TODO: make this required to implement + @SuppressWarnings("deprecation") + default boolean isParameterizedQuery(REQUEST request, int queryIndex) { + return isParameterizedQuery(request); + } } diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java index 22c873881cde..20723c66bc49 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java @@ -36,6 +36,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.jupiter.api.Test; @@ -108,6 +109,16 @@ static class TestMultiAttributesGetter extends TestAttributesGetter public Collection getRawQueryTexts(Map map) { return (Collection) map.get("db.query.texts"); } + + @SuppressWarnings("unchecked") + @Override + public boolean isParameterizedQuery(Map map, int queryIndex) { + List parameterizedQueries = (List) map.get("db.query.parameterized"); + if (parameterizedQueries == null) { + return super.isParameterizedQuery(map, queryIndex); + } + return parameterizedQueries.get(queryIndex); + } } @SuppressWarnings("deprecation") // TODO DB_CONNECTION_STRING deprecation @@ -351,6 +362,56 @@ void shouldExtractMultiQueryBatchAttributes() { assertThat(endAttributes.build().isEmpty()).isTrue(); } + @Test + void shouldExtractMixedParameterizedMultiQueryBatchAttributes() { + // given + Map request = new HashMap<>(); + request.put("db.namespace", "potatoes"); + request.put( + "db.query.texts", + asList("INSERT INTO potato VALUES('alice', ?)", "UPDATE potato SET name='bob' WHERE id=1")); + request.put("db.query.parameterized", asList(true, false)); + request.put(DB_OPERATION_BATCH_SIZE.getKey(), 2L); + + Context context = Context.root(); + + AttributesExtractor, Void> underTest = + SqlClientAttributesExtractor.create(new TestMultiAttributesGetter()); + + // when + AttributesBuilder startAttributes = Attributes.builder(); + underTest.onStart(startAttributes, context, request); + + AttributesBuilder endAttributes = Attributes.builder(); + underTest.onEnd(endAttributes, context, request, null, null); + + // then + if (emitStableDatabaseSemconv() && emitOldDatabaseSemconv()) { + assertThat(startAttributes.build()) + .containsOnly( + entry(DB_NAME, "potatoes"), + entry(DB_NAMESPACE, "potatoes"), + entry( + DB_QUERY_TEXT, + "INSERT INTO potato VALUES('alice', ?); UPDATE potato SET name=? WHERE id=?"), + entry(DB_QUERY_SUMMARY, "BATCH"), + entry(DB_OPERATION_BATCH_SIZE, 2L)); + } else if (emitOldDatabaseSemconv()) { + assertThat(startAttributes.build()).containsOnly(entry(DB_NAME, "potatoes")); + } else if (emitStableDatabaseSemconv()) { + assertThat(startAttributes.build()) + .containsOnly( + entry(DB_NAMESPACE, "potatoes"), + entry( + DB_QUERY_TEXT, + "INSERT INTO potato VALUES('alice', ?); UPDATE potato SET name=? WHERE id=?"), + entry(DB_QUERY_SUMMARY, "BATCH"), + entry(DB_OPERATION_BATCH_SIZE, 2L)); + } + + assertThat(endAttributes.build().isEmpty()).isTrue(); + } + @Test void shouldIgnoreBatchSizeOne() { // given diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java index 3dd0971e668a..90b21bd95ad5 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java @@ -5,20 +5,113 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0; +import static java.util.Collections.singleton; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.RegularStatement; import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; @AutoValue abstract class CassandraRequest { - public static CassandraRequest create( - Session session, String queryText, boolean parameterizedQuery) { - return new AutoValue_CassandraRequest(session, queryText, parameterizedQuery); + static CassandraRequest create(Session session, String queryText, boolean parameterizedQuery) { + return create(session, singleton(queryText), parameterizedQuery, null, null); + } + + static CassandraRequest create(Session session, String queryText) { + return create(session, singleton(queryText), false, null, null); + } + + static CassandraRequest create(Session session, Statement statement) { + if (statement instanceof BatchStatement) { + return create(session, (BatchStatement) statement); + } + return create( + session, singleton(getQuery(statement)), statement instanceof BoundStatement, null, null); + } + + private static CassandraRequest create(Session session, BatchStatement batchStatement) { + List queryTexts = new ArrayList<>(); + List mixedParameterizedQueries = null; + boolean allQueriesParameterized = true; + Boolean firstParameterizedQuery = null; + int queryIndex = 0; + for (Statement batchEntry : batchStatement.getStatements()) { + queryTexts.add(getQuery(batchEntry)); + boolean parameterizedQuery = batchEntry instanceof BoundStatement; + if (!parameterizedQuery) { + allQueriesParameterized = false; + } + if (firstParameterizedQuery == null) { + firstParameterizedQuery = parameterizedQuery; + } else if (parameterizedQuery != firstParameterizedQuery + && mixedParameterizedQueries == null) { + mixedParameterizedQueries = new ArrayList<>(batchStatement.size()); + for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) { + mixedParameterizedQueries.add(firstParameterizedQuery); + } + } + if (mixedParameterizedQueries != null) { + mixedParameterizedQueries.add(parameterizedQuery); + } + queryIndex++; + } + boolean allQueriesParameterizedResult = allQueriesParameterized; + if (mixedParameterizedQueries == null && firstParameterizedQuery != null) { + allQueriesParameterizedResult = firstParameterizedQuery; + } + return create( + session, + queryTexts, + allQueriesParameterizedResult, + mixedParameterizedQueries, + Long.valueOf(batchStatement.size())); + } + + private static CassandraRequest create( + Session session, + Collection queryTexts, + boolean allQueriesParameterized, + @Nullable List mixedParameterizedQueries, + @Nullable Long batchSize) { + return new AutoValue_CassandraRequest( + session, queryTexts, allQueriesParameterized, mixedParameterizedQueries, batchSize); + } + + private static String getQuery(Statement statement) { + String query = null; + if (statement instanceof BoundStatement) { + query = ((BoundStatement) statement).preparedStatement().getQueryString(); + } else if (statement instanceof RegularStatement) { + query = ((RegularStatement) statement).getQueryString(); + } + + return query == null ? "" : query; } - public abstract Session getSession(); + abstract Session getSession(); - public abstract String getQueryText(); + abstract Collection getQueryTexts(); + + abstract boolean allQueriesParameterized(); + + @Nullable + abstract List mixedParameterizedQueries(); + + boolean isParameterizedQuery(int queryIndex) { + List mixedParameterizedQueries = mixedParameterizedQueries(); + return mixedParameterizedQueries == null + ? allQueriesParameterized() + : mixedParameterizedQueries.get(queryIndex); + } - public abstract boolean isParameterizedQuery(); + @Nullable + abstract Long getBatchSize(); } diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSqlAttributesGetter.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSqlAttributesGetter.java index f3631de1b095..f1e962d6dc21 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSqlAttributesGetter.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSqlAttributesGetter.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0; import static io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlDialect.DOUBLE_QUOTES_ARE_IDENTIFIERS; -import static java.util.Collections.singleton; import com.datastax.driver.core.ExecutionInfo; import io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlClientAttributesGetter; @@ -39,7 +38,13 @@ public String getDbNamespace(CassandraRequest request) { @Override public Collection getRawQueryTexts(CassandraRequest request) { - return singleton(request.getQueryText()); + return request.getQueryTexts(); + } + + @Override + @Nullable + public Long getDbOperationBatchSize(CassandraRequest request) { + return request.getBatchSize(); } @Nullable @@ -50,7 +55,7 @@ public InetSocketAddress getNetworkPeerInetSocketAddress( } @Override - public boolean isParameterizedQuery(CassandraRequest request) { - return request.isParameterizedQuery(); + public boolean isParameterizedQuery(CassandraRequest request, int queryIndex) { + return request.isParameterizedQuery(queryIndex); } } diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/TracingSession.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/TracingSession.java index 799677a70d74..83454f1ab6d7 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/TracingSession.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/TracingSession.java @@ -7,7 +7,6 @@ import static io.opentelemetry.javaagent.instrumentation.cassandra.v3_0.CassandraSingletons.instrumenter; -import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.CloseFuture; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.PreparedStatement; @@ -48,7 +47,7 @@ public ListenableFuture initAsync() { @Override public ResultSet execute(String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); Context context = instrumenter().start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -93,9 +92,7 @@ public ResultSet execute(String query, Map values) { @Override public ResultSet execute(Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); Context context = instrumenter().start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -110,7 +107,7 @@ public ResultSet execute(Statement statement) { @Override public ResultSetFuture executeAsync(String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); Context context = instrumenter().start(Context.current(), request); try (Scope ignored = context.makeCurrent()) { ResultSetFuture future = session.executeAsync(query); @@ -152,9 +149,7 @@ public ResultSetFuture executeAsync(String query, Map values) { @Override public ResultSetFuture executeAsync(Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); Context context = instrumenter().start(Context.current(), request); try (Scope ignored = context.makeCurrent()) { ResultSetFuture future = session.executeAsync(statement); @@ -211,17 +206,6 @@ public State getState() { return session.getState(); } - private static String getQuery(Statement statement) { - String query = null; - if (statement instanceof BoundStatement) { - query = ((BoundStatement) statement).preparedStatement().getQueryString(); - } else if (statement instanceof RegularStatement) { - query = ((RegularStatement) statement).getQueryString(); - } - - return query == null ? "" : query; - } - private static void addCallbackToEndSpan( ResultSetFuture future, Context context, CassandraRequest request) { Futures.addCallback( diff --git a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java index 4de7930899d2..17925b675ba1 100644 --- a/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java +++ b/instrumentation/cassandra/cassandra-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraClientTest.java @@ -10,6 +10,7 @@ import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.semconv.DbAttributes.DB_COLLECTION_NAME; +import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_BATCH_SIZE; import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_NAME; import static io.opentelemetry.semconv.DbAttributes.DB_QUERY_SUMMARY; import static io.opentelemetry.semconv.DbAttributes.DB_SYSTEM_NAME; @@ -26,9 +27,12 @@ import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DbSystemNameIncubatingValues.CASSANDRA; import static org.junit.jupiter.api.Named.named; +import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; @@ -291,6 +295,104 @@ void testMetrics() { SERVER_PORT); } + @Test + void batchStatementWithSameQuery() { + Session session = cluster.connect(); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS batch_same_test"); + session.execute( + "CREATE KEYSPACE batch_same_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE batch_same_test.users ( name text PRIMARY KEY, age int )"); + PreparedStatement preparedStatement = + session.prepare("INSERT INTO batch_same_test.users (name, age) values (?, ?)"); + testing.waitForTraces(3); + testing.clearData(); + + BatchStatement batchStatement = + new BatchStatement() + .add(preparedStatement.bind("alice", 1)) + .add(preparedStatement.bind("bob", 2)); + session.execute(batchStatement); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName( + emitStableDatabaseSemconv() + ? "BATCH INSERT batch_same_test.users" + : "DB Query") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_TYPE, emitStableDatabaseSemconv() ? null : "ipv4"), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO batch_same_test.users (name, age) values (?, ?)" + : null), + equalTo( + DB_OPERATION_BATCH_SIZE, emitStableDatabaseSemconv() ? 2L : null), + equalTo( + DB_QUERY_SUMMARY, + emitStableDatabaseSemconv() + ? "BATCH INSERT batch_same_test.users" + : null)))); + } + + @Test + void batchStatementWithDifferentQueries() { + Session session = cluster.connect(); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS batch_mixed_test"); + session.execute( + "CREATE KEYSPACE batch_mixed_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); + PreparedStatement insertStatement = + session.prepare("INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?)"); + testing.waitForTraces(3); + testing.clearData(); + + BatchStatement batchStatement = + new BatchStatement() + .add(insertStatement.bind(1)) + .add( + new SimpleStatement( + "UPDATE batch_mixed_test.users SET age = 2 WHERE name = 'alice'")); + session.execute(batchStatement); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(emitStableDatabaseSemconv() ? "BATCH" : "DB Query") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_TYPE, emitStableDatabaseSemconv() ? null : "ipv4"), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?); UPDATE batch_mixed_test.users SET age = ? WHERE name = ?" + : null), + equalTo( + DB_OPERATION_BATCH_SIZE, emitStableDatabaseSemconv() ? 2L : null), + equalTo( + DB_QUERY_SUMMARY, emitStableDatabaseSemconv() ? "BATCH" : null)))); + } + private static Stream provideSyncParameters() { return Stream.of( Arguments.of( diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java index fd2b1dbeff91..d0920565c80b 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraRequest.java @@ -5,19 +5,121 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0; +import static java.util.Collections.singleton; + +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchableStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.session.Session; import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; @AutoValue abstract class CassandraRequest { - static CassandraRequest create(Session session, String queryText, boolean parameterizedQuery) { - return new AutoValue_CassandraRequest(session, queryText, parameterizedQuery); + static CassandraRequest create(Session session, String queryText) { + return create(session, singleton(queryText), false, null, null); + } + + static CassandraRequest create(Session session, Statement statement) { + if (statement instanceof BatchStatement) { + return create(session, (BatchStatement) statement); + } + return create(session, singleton(getQuery(statement)), hasQueryValues(statement), null, null); + } + + private static CassandraRequest create(Session session, BatchStatement batchStatement) { + List queryTexts = new ArrayList<>(); + List mixedParameterizedQueries = null; + boolean allQueriesParameterized = true; + Boolean firstParameterizedQuery = null; + int queryIndex = 0; + for (BatchableStatement batchEntry : batchStatement) { + queryTexts.add(getQuery(batchEntry)); + boolean parameterizedQuery = hasQueryValues(batchEntry); + if (!parameterizedQuery) { + allQueriesParameterized = false; + } + if (firstParameterizedQuery == null) { + firstParameterizedQuery = parameterizedQuery; + } else if (parameterizedQuery != firstParameterizedQuery + && mixedParameterizedQueries == null) { + mixedParameterizedQueries = new ArrayList<>(batchStatement.size()); + for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) { + mixedParameterizedQueries.add(firstParameterizedQuery); + } + } + if (mixedParameterizedQueries != null) { + mixedParameterizedQueries.add(parameterizedQuery); + } + queryIndex++; + } + boolean allQueriesParameterizedResult = allQueriesParameterized; + if (mixedParameterizedQueries == null && firstParameterizedQuery != null) { + allQueriesParameterizedResult = firstParameterizedQuery; + } + return create( + session, + queryTexts, + allQueriesParameterizedResult, + mixedParameterizedQueries, + Long.valueOf(batchStatement.size())); + } + + private static CassandraRequest create( + Session session, + Collection queryTexts, + boolean allQueriesParameterized, + @Nullable List mixedParameterizedQueries, + @Nullable Long batchSize) { + return new AutoValue_CassandraRequest( + session, queryTexts, allQueriesParameterized, mixedParameterizedQueries, batchSize); + } + + private static String getQuery(Statement statement) { + String query = null; + if (statement instanceof SimpleStatement) { + query = ((SimpleStatement) statement).getQuery(); + } else if (statement instanceof BoundStatement) { + query = ((BoundStatement) statement).getPreparedStatement().getQuery(); + } + + return query == null ? "" : query; + } + + private static boolean hasQueryValues(Statement statement) { + if (statement instanceof BoundStatement) { + return true; + } + if (statement instanceof SimpleStatement) { + SimpleStatement simpleStatement = (SimpleStatement) statement; + return !simpleStatement.getPositionalValues().isEmpty() + || !simpleStatement.getNamedValues().isEmpty(); + } + return false; } abstract Session getSession(); - abstract String getQueryText(); + abstract Collection getQueryTexts(); + + abstract boolean allQueriesParameterized(); + + @Nullable + abstract List mixedParameterizedQueries(); + + boolean isParameterizedQuery(int queryIndex) { + List mixedParameterizedQueries = mixedParameterizedQueries(); + return mixedParameterizedQueries == null + ? allQueriesParameterized() + : mixedParameterizedQueries.get(queryIndex); + } - abstract boolean isParameterizedQuery(); + @Nullable + abstract Long getBatchSize(); } diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraSqlAttributesGetter.java b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraSqlAttributesGetter.java index b22fd7ad7979..7c3df7067adf 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraSqlAttributesGetter.java +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/CassandraSqlAttributesGetter.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0; import static io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlDialect.DOUBLE_QUOTES_ARE_IDENTIFIERS; -import static java.util.Collections.singleton; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; @@ -42,7 +41,13 @@ public String getDbNamespace(CassandraRequest request) { @Override public Collection getRawQueryTexts(CassandraRequest request) { - return singleton(request.getQueryText()); + return request.getQueryTexts(); + } + + @Override + @Nullable + public Long getDbOperationBatchSize(CassandraRequest request) { + return request.getBatchSize(); } @Nullable @@ -63,7 +68,7 @@ public InetSocketAddress getNetworkPeerInetSocketAddress( } @Override - public boolean isParameterizedQuery(CassandraRequest request) { - return request.isParameterizedQuery(); + public boolean isParameterizedQuery(CassandraRequest request, int queryIndex) { + return request.isParameterizedQuery(queryIndex); } } diff --git a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/TracingCqlSession.java b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/TracingCqlSession.java index 31dc0f979520..ac07bd01e738 100644 --- a/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/TracingCqlSession.java +++ b/instrumentation/cassandra/cassandra-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v4_0/TracingCqlSession.java @@ -11,10 +11,8 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DriverException; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -67,7 +65,7 @@ static CqlSession wrapSession(CqlSession session) { } private static ResultSet execute(CqlSession session, String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); Context context = instrumenter().start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -81,9 +79,7 @@ private static ResultSet execute(CqlSession session, String query) { } private static ResultSet execute(CqlSession session, Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); Context context = instrumenter().start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -98,14 +94,12 @@ private static ResultSet execute(CqlSession session, Statement statement) { private static CompletionStage executeAsync( CqlSession session, Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); return executeAsync(request, () -> session.executeAsync(statement)); } private static CompletionStage executeAsync(CqlSession session, String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); return executeAsync(request, () -> session.executeAsync(query)); } @@ -144,17 +138,6 @@ private static CompletableFuture wrap(CompletionStage future, Context return result; } - private static String getQuery(Statement statement) { - String query = null; - if (statement instanceof SimpleStatement) { - query = ((SimpleStatement) statement).getQuery(); - } else if (statement instanceof BoundStatement) { - query = ((BoundStatement) statement).getPreparedStatement().getQuery(); - } - - return query == null ? "" : query; - } - @Nullable private static ExecutionInfo getExecutionInfo( @Nullable AsyncResultSet asyncResultSet, @Nullable Throwable throwable) { diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java index 6456cd92174e..1674f1e6c291 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraAttributesExtractor.java @@ -110,6 +110,8 @@ public void onEnd( Statement statement = (Statement) executionInfo.getRequest(); String consistencyLevel; + // getSession() is deprecated only because its visibility will be reduced in the future. + @SuppressWarnings("deprecation") DriverExecutionProfile config = request.getSession().getContext().getConfig().getDefaultProfile(); if (statement.getConsistencyLevel() != null) { diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java index 49be655774ab..629511b1aa70 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraRequest.java @@ -5,20 +5,155 @@ package io.opentelemetry.instrumentation.cassandra.v4_4; +import static java.util.Collections.singleton; + +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchableStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.session.Session; import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.annotation.Nullable; @AutoValue public abstract class CassandraRequest { + @Deprecated public static CassandraRequest create( Session session, String queryText, boolean parameterizedQuery) { - return new AutoValue_CassandraRequest(session, queryText, parameterizedQuery); + return create(session, singleton(queryText), parameterizedQuery, null, null); + } + + static CassandraRequest create(Session session, String queryText) { + return create(session, singleton(queryText), false, null, null); + } + + static CassandraRequest create(Session session, Statement statement) { + if (statement instanceof BatchStatement) { + return create(session, (BatchStatement) statement); + } + return create(session, singleton(getQuery(statement)), hasQueryValues(statement), null, null); + } + + private static CassandraRequest create(Session session, BatchStatement batchStatement) { + List queryTexts = new ArrayList<>(); + List mixedParameterizedQueries = null; + boolean allQueriesParameterized = true; + Boolean firstParameterizedQuery = null; + int queryIndex = 0; + for (BatchableStatement batchEntry : batchStatement) { + queryTexts.add(getQuery(batchEntry)); + boolean parameterizedQuery = hasQueryValues(batchEntry); + if (!parameterizedQuery) { + allQueriesParameterized = false; + } + if (firstParameterizedQuery == null) { + firstParameterizedQuery = parameterizedQuery; + } else if (parameterizedQuery != firstParameterizedQuery + && mixedParameterizedQueries == null) { + mixedParameterizedQueries = new ArrayList<>(batchStatement.size()); + for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) { + mixedParameterizedQueries.add(firstParameterizedQuery); + } + } + if (mixedParameterizedQueries != null) { + mixedParameterizedQueries.add(parameterizedQuery); + } + queryIndex++; + } + boolean allQueriesParameterizedResult = allQueriesParameterized; + if (mixedParameterizedQueries == null && firstParameterizedQuery != null) { + allQueriesParameterizedResult = firstParameterizedQuery; + } + return create( + session, + queryTexts, + allQueriesParameterizedResult, + mixedParameterizedQueries, + Long.valueOf(batchStatement.size())); + } + + private static CassandraRequest create( + Session session, + Collection queryTexts, + boolean allQueriesParameterized, + @Nullable List mixedParameterizedQueries, + @Nullable Long batchSize) { + return new AutoValue_CassandraRequest( + session, queryTexts, allQueriesParameterized, mixedParameterizedQueries, batchSize); + } + + private static String getQuery(Statement statement) { + String query = null; + if (statement instanceof SimpleStatement) { + query = ((SimpleStatement) statement).getQuery(); + } else if (statement instanceof BoundStatement) { + query = ((BoundStatement) statement).getPreparedStatement().getQuery(); + } + + return query == null ? "" : query; + } + + private static boolean hasQueryValues(Statement statement) { + if (statement instanceof BoundStatement) { + return true; + } + if (statement instanceof SimpleStatement) { + SimpleStatement simpleStatement = (SimpleStatement) statement; + return !simpleStatement.getPositionalValues().isEmpty() + || !simpleStatement.getNamedValues().isEmpty(); + } + return false; } + /** + * @deprecated this method will be reduced to package-private visibility + */ + @Deprecated public abstract Session getSession(); - public abstract String getQueryText(); + abstract Collection getQueryTexts(); + + /** + * Returns the raw query text. + * + * @deprecated use {@link #getQueryTexts()} instead + */ + @Deprecated + public String getQueryText() { + if (getBatchSize() != null) { + // Preserve previous public API behavior: BatchStatement query text was not captured. + return ""; + } + return getQueryTexts().iterator().next(); + } + + abstract boolean allQueriesParameterized(); + + @Nullable + abstract List mixedParameterizedQueries(); + + /** + * Returns whether all queries in this request are parameterized. + * + * @deprecated use {@link #isParameterizedQuery(int)} instead + */ + @Deprecated + public boolean isParameterizedQuery() { + return allQueriesParameterized(); + } + + boolean isParameterizedQuery(int queryIndex) { + List mixedParameterizedQueries = mixedParameterizedQueries(); + return mixedParameterizedQueries == null + ? allQueriesParameterized() + : mixedParameterizedQueries.get(queryIndex); + } - public abstract boolean isParameterizedQuery(); + @Nullable + abstract Long getBatchSize(); } diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java index 176e1d4e7ee4..c76b82965cf9 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/CassandraSqlAttributesGetter.java @@ -6,7 +6,6 @@ package io.opentelemetry.instrumentation.cassandra.v4_4; import static io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlDialect.DOUBLE_QUOTES_ARE_IDENTIFIERS; -import static java.util.Collections.singleton; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; @@ -36,6 +35,8 @@ public SqlDialect getSqlDialect(CassandraRequest request) { return DOUBLE_QUOTES_ARE_IDENTIFIERS; } + // getSession() is deprecated only because its visibility will be reduced in the future. + @SuppressWarnings("deprecation") @Override @Nullable public String getDbNamespace(CassandraRequest request) { @@ -44,7 +45,13 @@ public String getDbNamespace(CassandraRequest request) { @Override public Collection getRawQueryTexts(CassandraRequest request) { - return singleton(request.getQueryText()); + return request.getQueryTexts(); + } + + @Override + @Nullable + public Long getDbOperationBatchSize(CassandraRequest request) { + return request.getBatchSize(); } @Nullable @@ -67,7 +74,7 @@ public InetSocketAddress getNetworkPeerInetSocketAddress( } @Override - public boolean isParameterizedQuery(CassandraRequest request) { - return request.isParameterizedQuery(); + public boolean isParameterizedQuery(CassandraRequest request, int queryIndex) { + return request.isParameterizedQuery(queryIndex); } } diff --git a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java index 0028a27d50a8..10d5822370a6 100644 --- a/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java +++ b/instrumentation/cassandra/cassandra-4.4/library/src/main/java/io/opentelemetry/instrumentation/cassandra/v4_4/TracingCqlSession.java @@ -12,10 +12,8 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DriverException; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; -import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -83,7 +81,7 @@ CqlSession wrapSession(CqlSession session) { } private ResultSet execute(CqlSession session, String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); Context context = instrumenter.start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -97,9 +95,7 @@ private ResultSet execute(CqlSession session, String query) { } private ResultSet execute(CqlSession session, Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); Context context = instrumenter.start(Context.current(), request); ResultSet resultSet; try (Scope ignored = context.makeCurrent()) { @@ -113,14 +109,12 @@ private ResultSet execute(CqlSession session, Statement statement) { } private CompletionStage executeAsync(CqlSession session, Statement statement) { - String query = getQuery(statement); - CassandraRequest request = - CassandraRequest.create(session, query, statement instanceof BoundStatement); + CassandraRequest request = CassandraRequest.create(session, statement); return executeAsync(request, () -> session.executeAsync(statement)); } private CompletionStage executeAsync(CqlSession session, String query) { - CassandraRequest request = CassandraRequest.create(session, query, false); + CassandraRequest request = CassandraRequest.create(session, query); return executeAsync(request, () -> session.executeAsync(query)); } @@ -163,17 +157,6 @@ private static CompletableFuture wrap(CompletionStage future, Context return result; } - private static String getQuery(Statement statement) { - String query = null; - if (statement instanceof SimpleStatement) { - query = ((SimpleStatement) statement).getQuery(); - } else if (statement instanceof BoundStatement) { - query = ((BoundStatement) statement).getPreparedStatement().getQuery(); - } - - return query == null ? "" : query; - } - @Nullable private static ExecutionInfo getExecutionInfo( @Nullable AsyncResultSet asyncResultSet, @Nullable Throwable throwable) { diff --git a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java index d2ae4126d977..9964fb8e8530 100644 --- a/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java +++ b/instrumentation/cassandra/cassandra-common-4.0/testing/src/main/java/io/opentelemetry/cassandra/common/v4_0/AbstractCassandraTest.java @@ -11,6 +11,7 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static io.opentelemetry.semconv.DbAttributes.DB_COLLECTION_NAME; +import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_BATCH_SIZE; import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_NAME; import static io.opentelemetry.semconv.DbAttributes.DB_QUERY_SUMMARY; import static io.opentelemetry.semconv.DbAttributes.DB_SYSTEM_NAME; @@ -37,6 +38,10 @@ import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.DefaultBatchType; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension; @@ -122,6 +127,198 @@ void testMetrics() { SERVER_PORT); } + @Test + void simpleStatementWithValues() { + CqlSession session = getSession(null); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS simple_values_test"); + session.execute( + "CREATE KEYSPACE simple_values_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE simple_values_test.users ( name text PRIMARY KEY, age int )"); + testing().clearData(); + + session.execute( + SimpleStatement.newInstance( + "INSERT INTO simple_values_test.users (name, age) values ('alice', ?)", 1)); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("INSERT simple_values_test.users") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + satisfies( + NETWORK_TYPE, + emitStableDatabaseSemconv() + ? val -> val.isNull() + : val -> val.isIn("ipv4", "ipv6")), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO simple_values_test.users (name, age) values ('alice', ?)" + : "INSERT INTO simple_values_test.users (name, age) values (?, ?)"), + equalTo( + DB_QUERY_SUMMARY, + emitStableDatabaseSemconv() + ? "INSERT simple_values_test.users" + : null), + equalTo(maybeStable(DB_OPERATION), "INSERT"), + equalTo(maybeStable(DB_CASSANDRA_CONSISTENCY_LEVEL), "LOCAL_ONE"), + equalTo(maybeStable(DB_CASSANDRA_COORDINATOR_DC), "datacenter1"), + satisfies( + maybeStable(DB_CASSANDRA_COORDINATOR_ID), + val -> val.isInstanceOf(String.class)), + satisfies( + maybeStable(DB_CASSANDRA_IDEMPOTENCE), + val -> val.isInstanceOf(Boolean.class)), + equalTo(maybeStable(DB_CASSANDRA_PAGE_SIZE), 5000), + equalTo(maybeStable(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT), 0), + equalTo( + maybeStable(DB_CASSANDRA_TABLE), "simple_values_test.users")))); + } + + @Test + void batchStatementWithSameQuery() { + CqlSession session = getSession(null); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS batch_same_test"); + session.execute( + "CREATE KEYSPACE batch_same_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE batch_same_test.users ( name text PRIMARY KEY, age int )"); + PreparedStatement preparedStatement = + session.prepare("INSERT INTO batch_same_test.users (name, age) values (?, ?)"); + testing().waitForTraces(3); + testing().clearData(); + + BatchStatement batchStatement = + BatchStatement.newInstance( + DefaultBatchType.LOGGED, + preparedStatement.bind("alice", 1), + preparedStatement.bind("bob", 2)); + session.execute(batchStatement); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName( + emitStableDatabaseSemconv() + ? "BATCH INSERT batch_same_test.users" + : "DB Query") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + satisfies( + NETWORK_TYPE, + emitStableDatabaseSemconv() + ? val -> val.isNull() + : val -> val.isIn("ipv4", "ipv6")), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO batch_same_test.users (name, age) values (?, ?)" + : null), + equalTo( + DB_OPERATION_BATCH_SIZE, + emitStableDatabaseSemconv() ? 2L : null), + equalTo( + DB_QUERY_SUMMARY, + emitStableDatabaseSemconv() + ? "BATCH INSERT batch_same_test.users" + : null), + equalTo(maybeStable(DB_CASSANDRA_CONSISTENCY_LEVEL), "LOCAL_ONE"), + equalTo(maybeStable(DB_CASSANDRA_COORDINATOR_DC), "datacenter1"), + satisfies( + maybeStable(DB_CASSANDRA_COORDINATOR_ID), + val -> val.isInstanceOf(String.class)), + satisfies( + maybeStable(DB_CASSANDRA_IDEMPOTENCE), + val -> val.isInstanceOf(Boolean.class)), + equalTo(maybeStable(DB_CASSANDRA_PAGE_SIZE), 5000), + equalTo( + maybeStable(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT), 0)))); + } + + @Test + void batchStatementWithDifferentQueries() { + CqlSession session = getSession(null); + cleanup.deferCleanup(session); + + session.execute("DROP KEYSPACE IF EXISTS batch_mixed_test"); + session.execute( + "CREATE KEYSPACE batch_mixed_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1}"); + session.execute("CREATE TABLE batch_mixed_test.users ( name text PRIMARY KEY, age int )"); + PreparedStatement insertStatement = + session.prepare("INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?)"); + testing().waitForTraces(3); + testing().clearData(); + + BatchStatement batchStatement = + BatchStatement.newInstance( + DefaultBatchType.LOGGED, + insertStatement.bind(1), + SimpleStatement.newInstance( + "UPDATE batch_mixed_test.users SET age = 2 WHERE name = 'alice'")); + session.execute(batchStatement); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(emitStableDatabaseSemconv() ? "BATCH" : "DB Query") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + satisfies( + NETWORK_TYPE, + emitStableDatabaseSemconv() + ? val -> val.isNull() + : val -> val.isIn("ipv4", "ipv6")), + equalTo(SERVER_ADDRESS, cassandraHost), + equalTo(SERVER_PORT, cassandraPort), + equalTo(NETWORK_PEER_ADDRESS, cassandraIp), + equalTo(NETWORK_PEER_PORT, cassandraPort), + equalTo(maybeStable(DB_SYSTEM), CASSANDRA), + equalTo( + maybeStable(DB_STATEMENT), + emitStableDatabaseSemconv() + ? "INSERT INTO batch_mixed_test.users (name, age) values ('alice', ?); UPDATE batch_mixed_test.users SET age = ? WHERE name = ?" + : null), + equalTo( + DB_OPERATION_BATCH_SIZE, + emitStableDatabaseSemconv() ? 2L : null), + equalTo( + DB_QUERY_SUMMARY, emitStableDatabaseSemconv() ? "BATCH" : null), + equalTo(maybeStable(DB_CASSANDRA_CONSISTENCY_LEVEL), "LOCAL_ONE"), + equalTo(maybeStable(DB_CASSANDRA_COORDINATOR_DC), "datacenter1"), + satisfies( + maybeStable(DB_CASSANDRA_COORDINATOR_ID), + val -> val.isInstanceOf(String.class)), + satisfies( + maybeStable(DB_CASSANDRA_IDEMPOTENCE), + val -> val.isInstanceOf(Boolean.class)), + equalTo(maybeStable(DB_CASSANDRA_PAGE_SIZE), 5000), + equalTo( + maybeStable(DB_CASSANDRA_SPECULATIVE_EXECUTION_COUNT), 0)))); + } + @ParameterizedTest(name = "{index}: {0}") @MethodSource("provideSyncParameters") void syncTest(Parameter parameter) { diff --git a/instrumentation/jdbc/library/src/main/java/io/opentelemetry/instrumentation/jdbc/internal/JdbcAttributesGetter.java b/instrumentation/jdbc/library/src/main/java/io/opentelemetry/instrumentation/jdbc/internal/JdbcAttributesGetter.java index e225180e2edf..0fd7e2b551dc 100644 --- a/instrumentation/jdbc/library/src/main/java/io/opentelemetry/instrumentation/jdbc/internal/JdbcAttributesGetter.java +++ b/instrumentation/jdbc/library/src/main/java/io/opentelemetry/instrumentation/jdbc/internal/JdbcAttributesGetter.java @@ -134,7 +134,8 @@ public Map getDbQueryParameters(DbRequest request) { } @Override - public boolean isParameterizedQuery(DbRequest request) { + public boolean isParameterizedQuery(DbRequest request, int queryIndex) { + // JDBC does not support mixed parameterization within a single request. return request.isParameterizedQuery(); } diff --git a/instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/R2dbcSqlAttributesGetter.java b/instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/R2dbcSqlAttributesGetter.java index fe8f13275bdf..da4ac1d66c5d 100644 --- a/instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/R2dbcSqlAttributesGetter.java +++ b/instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/R2dbcSqlAttributesGetter.java @@ -90,7 +90,8 @@ public Integer getServerPort(DbExecution request) { } @Override - public boolean isParameterizedQuery(DbExecution request) { + public boolean isParameterizedQuery(DbExecution request, int queryIndex) { + // R2DBC does not support mixed parameterization within a single request. return request.isParameterizedQuery(); } } diff --git a/instrumentation/vertx/vertx-sql-client/vertx-sql-client-common-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/sqlclient/common/v4_0/VertxSqlClientAttributesGetter.java b/instrumentation/vertx/vertx-sql-client/vertx-sql-client-common-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/sqlclient/common/v4_0/VertxSqlClientAttributesGetter.java index e95ae357cd06..37e1479eb3b3 100644 --- a/instrumentation/vertx/vertx-sql-client/vertx-sql-client-common-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/sqlclient/common/v4_0/VertxSqlClientAttributesGetter.java +++ b/instrumentation/vertx/vertx-sql-client/vertx-sql-client-common-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/sqlclient/common/v4_0/VertxSqlClientAttributesGetter.java @@ -98,7 +98,8 @@ public String getErrorType( } @Override - public boolean isParameterizedQuery(VertxSqlClientRequest request) { + public boolean isParameterizedQuery(VertxSqlClientRequest request, int queryIndex) { + // Vert.x SQL client does not support mixed parameterization within a single request. return request.isParameterizedQuery(); }