Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public String extract(REQUEST request) {
getter, request, batch ? "BATCH" : null, null, analyzedQuery.getStoredProcedureName());
}

MultiQuery multiQuery = MultiQuery.analyzeWithSummary(rawQueryTexts, dialect, false);
MultiQuery multiQuery = MultiQuery.analyzeWithSummary(rawQueryTexts, dialect);
String querySummary = multiQuery.getQuerySummary();
if (querySummary != null) {
return querySummary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,18 @@ private MultiQuery(
this.querySummary = querySummary;
}

static MultiQuery analyzeWithSummary(
Collection<String> rawQueryTexts, SqlDialect dialect, boolean querySanitizationEnabled) {
UniqueValue uniqueStoredProcedureName = new UniqueValue();
Set<String> uniqueQueryTexts = new LinkedHashSet<>();
UniqueValue uniqueQuerySummary = new UniqueValue();
static MultiQuery analyzeWithSummary(Collection<String> rawQueryTexts, SqlDialect dialect) {
Builder builder = builder();
for (String rawQueryText : rawQueryTexts) {
SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect);
uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName());
uniqueQueryTexts.add(querySanitizationEnabled ? analyzedQuery.getQueryText() : rawQueryText);
uniqueQuerySummary.set(analyzedQuery.getQuerySummary());
builder.add(analyzedQuery, rawQueryText);
}

String querySummary = uniqueQuerySummary.getValue();
return new MultiQuery(
uniqueStoredProcedureName.getValue(),
uniqueQueryTexts,
querySummary == null ? "BATCH" : "BATCH " + querySummary);
return builder.build();
}

static Builder builder() {
return new Builder();
}

@Nullable
Expand All @@ -56,6 +51,26 @@ public Set<String> getQueryTexts() {
return queryTexts;
}

static class Builder {
private final UniqueValue uniqueStoredProcedureName = new UniqueValue();
private final Set<String> uniqueQueryTexts = new LinkedHashSet<>();
private final UniqueValue uniqueQuerySummary = new UniqueValue();

void add(SqlQuery analyzedQuery, @Nullable String queryText) {
uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName());
uniqueQueryTexts.add(queryText);
uniqueQuerySummary.set(analyzedQuery.getQuerySummary());
}

MultiQuery build() {
String querySummary = uniqueQuerySummary.getValue();
return new MultiQuery(
uniqueStoredProcedureName.getValue(),
uniqueQueryTexts,
querySummary == null ? "BATCH" : "BATCH " + querySummary);
}
}

private static class UniqueValue {
@Nullable private String value;
private boolean valid = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST
if (isBatch) {
attributes.put(DB_OPERATION_BATCH_SIZE, batchSize);
}
boolean parameterizedQuery = getter.isParameterizedQuery(request);
boolean shouldSanitize = querySanitizationEnabled && !parameterizedQuery;
if (rawQueryTexts.size() == 1) {
String rawQueryText = rawQueryTexts.iterator().next();
SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect);
boolean shouldSanitize =
querySanitizationEnabled && !getter.isParameterizedQuery(request, 0);
attributes.put(DB_QUERY_TEXT, shouldSanitize ? analyzedQuery.getQueryText() : rawQueryText);
String querySummary = analyzedQuery.getQuerySummary();
attributes.put(
Expand All @@ -125,9 +125,16 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST
}
attributes.put(DB_STORED_PROCEDURE_NAME, analyzedQuery.getStoredProcedureName());
} else if (rawQueryTexts.size() > 1) {
MultiQuery multiQuery =
MultiQuery.analyzeWithSummary(
getter.getRawQueryTexts(request), dialect, shouldSanitize);
MultiQuery.Builder builder = MultiQuery.builder();
int queryIndex = 0;
for (String rawQueryText : rawQueryTexts) {
SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect);
boolean shouldSanitize =
querySanitizationEnabled && !getter.isParameterizedQuery(request, queryIndex);
builder.add(analyzedQuery, shouldSanitize ? analyzedQuery.getQueryText() : rawQueryText);
queryIndex++;
}
MultiQuery multiQuery = builder.build();
attributes.put(DB_QUERY_TEXT, join("; ", multiQuery.getQueryTexts()));
attributes.put(DB_QUERY_SUMMARY, multiQuery.getQuerySummary());
attributes.put(DB_STORED_PROCEDURE_NAME, multiQuery.getStoredProcedureName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,25 @@ default String getDbQuerySummary(REQUEST request) {
* query does not need to be sanitized. See <a
* href="https://github.com/open-telemetry/semantic-conventions/blob/main/docs/db/database-spans.md#sanitization-of-dbquerytext">sanitization
* of db.query.text</a>.
*
* @deprecated use {@link #isParameterizedQuery(Object, int)} instead
*/
// TODO: make this required to implement
@Deprecated
default boolean isParameterizedQuery(REQUEST request) {
return false;
}

/**
* Returns whether the query at {@code queryIndex} in {@link #getRawQueryTexts(Object)} is
* parameterized.
*
* <p>The {@code queryIndex} is zero-based and follows the iteration order of {@link
* #getRawQueryTexts(Object)}. This supports batch operations where individual entries may have
* different parameterization.
*/
// TODO: make this required to implement
@SuppressWarnings("deprecation")
default boolean isParameterizedQuery(REQUEST request, int queryIndex) {
return isParameterizedQuery(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -108,6 +109,16 @@ static class TestMultiAttributesGetter extends TestAttributesGetter
public Collection<String> getRawQueryTexts(Map<String, Object> map) {
return (Collection<String>) map.get("db.query.texts");
}

@SuppressWarnings("unchecked")
@Override
public boolean isParameterizedQuery(Map<String, Object> map, int queryIndex) {
List<Boolean> parameterizedQueries = (List<Boolean>) map.get("db.query.parameterized");
if (parameterizedQueries == null) {
return super.isParameterizedQuery(map, queryIndex);
}
return parameterizedQueries.get(queryIndex);
}
}

@SuppressWarnings("deprecation") // TODO DB_CONNECTION_STRING deprecation
Expand Down Expand Up @@ -351,6 +362,56 @@ void shouldExtractMultiQueryBatchAttributes() {
assertThat(endAttributes.build().isEmpty()).isTrue();
}

@Test
void shouldExtractMixedParameterizedMultiQueryBatchAttributes() {
// given
Map<String, Object> request = new HashMap<>();
request.put("db.namespace", "potatoes");
request.put(
"db.query.texts",
asList("INSERT INTO potato VALUES('alice', ?)", "UPDATE potato SET name='bob' WHERE id=1"));
request.put("db.query.parameterized", asList(true, false));
request.put(DB_OPERATION_BATCH_SIZE.getKey(), 2L);

Context context = Context.root();

AttributesExtractor<Map<String, Object>, Void> underTest =
SqlClientAttributesExtractor.create(new TestMultiAttributesGetter());

// when
AttributesBuilder startAttributes = Attributes.builder();
underTest.onStart(startAttributes, context, request);

AttributesBuilder endAttributes = Attributes.builder();
underTest.onEnd(endAttributes, context, request, null, null);

// then
if (emitStableDatabaseSemconv() && emitOldDatabaseSemconv()) {
assertThat(startAttributes.build())
.containsOnly(
entry(DB_NAME, "potatoes"),
entry(DB_NAMESPACE, "potatoes"),
entry(
DB_QUERY_TEXT,
"INSERT INTO potato VALUES('alice', ?); UPDATE potato SET name=? WHERE id=?"),
entry(DB_QUERY_SUMMARY, "BATCH"),
entry(DB_OPERATION_BATCH_SIZE, 2L));
} else if (emitOldDatabaseSemconv()) {
assertThat(startAttributes.build()).containsOnly(entry(DB_NAME, "potatoes"));
} else if (emitStableDatabaseSemconv()) {
assertThat(startAttributes.build())
.containsOnly(
entry(DB_NAMESPACE, "potatoes"),
entry(
DB_QUERY_TEXT,
"INSERT INTO potato VALUES('alice', ?); UPDATE potato SET name=? WHERE id=?"),
entry(DB_QUERY_SUMMARY, "BATCH"),
entry(DB_OPERATION_BATCH_SIZE, 2L));
}

assertThat(endAttributes.build().isEmpty()).isTrue();
}

@Test
void shouldIgnoreBatchSizeOne() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,113 @@

package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;

import static java.util.Collections.singleton;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.RegularStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;

@AutoValue
abstract class CassandraRequest {

public static CassandraRequest create(
Session session, String queryText, boolean parameterizedQuery) {
return new AutoValue_CassandraRequest(session, queryText, parameterizedQuery);
static CassandraRequest create(Session session, String queryText, boolean parameterizedQuery) {
return create(session, singleton(queryText), parameterizedQuery, null, null);
}

static CassandraRequest create(Session session, String queryText) {
return create(session, singleton(queryText), false, null, null);
}

static CassandraRequest create(Session session, Statement statement) {
if (statement instanceof BatchStatement) {
return create(session, (BatchStatement) statement);
}
return create(
session, singleton(getQuery(statement)), statement instanceof BoundStatement, null, null);
}

private static CassandraRequest create(Session session, BatchStatement batchStatement) {
List<String> queryTexts = new ArrayList<>();
List<Boolean> mixedParameterizedQueries = null;
boolean allQueriesParameterized = true;
Boolean firstParameterizedQuery = null;
int queryIndex = 0;
for (Statement batchEntry : batchStatement.getStatements()) {
queryTexts.add(getQuery(batchEntry));
boolean parameterizedQuery = batchEntry instanceof BoundStatement;
if (!parameterizedQuery) {
allQueriesParameterized = false;
}
if (firstParameterizedQuery == null) {
firstParameterizedQuery = parameterizedQuery;
} else if (parameterizedQuery != firstParameterizedQuery
&& mixedParameterizedQueries == null) {
mixedParameterizedQueries = new ArrayList<>(batchStatement.size());
for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) {
mixedParameterizedQueries.add(firstParameterizedQuery);
}
}
if (mixedParameterizedQueries != null) {
mixedParameterizedQueries.add(parameterizedQuery);
}
queryIndex++;
}
boolean allQueriesParameterizedResult = allQueriesParameterized;
if (mixedParameterizedQueries == null && firstParameterizedQuery != null) {
allQueriesParameterizedResult = firstParameterizedQuery;
}
return create(
session,
queryTexts,
allQueriesParameterizedResult,
mixedParameterizedQueries,
Long.valueOf(batchStatement.size()));
}

private static CassandraRequest create(
Session session,
Collection<String> queryTexts,
boolean allQueriesParameterized,
@Nullable List<Boolean> mixedParameterizedQueries,
@Nullable Long batchSize) {
return new AutoValue_CassandraRequest(
session, queryTexts, allQueriesParameterized, mixedParameterizedQueries, batchSize);
}

private static String getQuery(Statement statement) {
String query = null;
if (statement instanceof BoundStatement) {
query = ((BoundStatement) statement).preparedStatement().getQueryString();
} else if (statement instanceof RegularStatement) {
query = ((RegularStatement) statement).getQueryString();
}

return query == null ? "" : query;
}

public abstract Session getSession();
abstract Session getSession();

public abstract String getQueryText();
abstract Collection<String> getQueryTexts();

abstract boolean allQueriesParameterized();

@Nullable
abstract List<Boolean> mixedParameterizedQueries();

boolean isParameterizedQuery(int queryIndex) {
List<Boolean> mixedParameterizedQueries = mixedParameterizedQueries();
return mixedParameterizedQueries == null
? allQueriesParameterized()
: mixedParameterizedQueries.get(queryIndex);
}

public abstract boolean isParameterizedQuery();
@Nullable
abstract Long getBatchSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;

import static io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlDialect.DOUBLE_QUOTES_ARE_IDENTIFIERS;
import static java.util.Collections.singleton;

import com.datastax.driver.core.ExecutionInfo;
import io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlClientAttributesGetter;
Expand Down Expand Up @@ -39,7 +38,13 @@ public String getDbNamespace(CassandraRequest request) {

@Override
public Collection<String> getRawQueryTexts(CassandraRequest request) {
return singleton(request.getQueryText());
return request.getQueryTexts();
}

@Override
@Nullable
public Long getDbOperationBatchSize(CassandraRequest request) {
return request.getBatchSize();
}

@Nullable
Expand All @@ -50,7 +55,7 @@ public InetSocketAddress getNetworkPeerInetSocketAddress(
}

@Override
public boolean isParameterizedQuery(CassandraRequest request) {
return request.isParameterizedQuery();
public boolean isParameterizedQuery(CassandraRequest request, int queryIndex) {
return request.isParameterizedQuery(queryIndex);
}
}
Loading
Loading