Skip to content

Commit a306dde

Browse files
traskCopilot
andauthored
Capture Cassandra batch queries (#18964)
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent b6968e5 commit a306dde

20 files changed

Lines changed: 810 additions & 110 deletions

File tree

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public String extract(REQUEST request) {
227227
getter, request, batch ? "BATCH" : null, null, analyzedQuery.getStoredProcedureName());
228228
}
229229

230-
MultiQuery multiQuery = MultiQuery.analyzeWithSummary(rawQueryTexts, dialect, false);
230+
MultiQuery multiQuery = MultiQuery.analyzeWithSummary(rawQueryTexts, dialect);
231231
String querySummary = multiQuery.getQuerySummary();
232232
if (querySummary != null) {
233233
return querySummary;

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/MultiQuery.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,18 @@ private MultiQuery(
2323
this.querySummary = querySummary;
2424
}
2525

26-
static MultiQuery analyzeWithSummary(
27-
Collection<String> rawQueryTexts, SqlDialect dialect, boolean querySanitizationEnabled) {
28-
UniqueValue uniqueStoredProcedureName = new UniqueValue();
29-
Set<String> uniqueQueryTexts = new LinkedHashSet<>();
30-
UniqueValue uniqueQuerySummary = new UniqueValue();
26+
static MultiQuery analyzeWithSummary(Collection<String> rawQueryTexts, SqlDialect dialect) {
27+
Builder builder = builder();
3128
for (String rawQueryText : rawQueryTexts) {
3229
SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect);
33-
uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName());
34-
uniqueQueryTexts.add(querySanitizationEnabled ? analyzedQuery.getQueryText() : rawQueryText);
35-
uniqueQuerySummary.set(analyzedQuery.getQuerySummary());
30+
builder.add(analyzedQuery, rawQueryText);
3631
}
3732

38-
String querySummary = uniqueQuerySummary.getValue();
39-
return new MultiQuery(
40-
uniqueStoredProcedureName.getValue(),
41-
uniqueQueryTexts,
42-
querySummary == null ? "BATCH" : "BATCH " + querySummary);
33+
return builder.build();
34+
}
35+
36+
static Builder builder() {
37+
return new Builder();
4338
}
4439

4540
@Nullable
@@ -56,6 +51,26 @@ public Set<String> getQueryTexts() {
5651
return queryTexts;
5752
}
5853

54+
static class Builder {
55+
private final UniqueValue uniqueStoredProcedureName = new UniqueValue();
56+
private final Set<String> uniqueQueryTexts = new LinkedHashSet<>();
57+
private final UniqueValue uniqueQuerySummary = new UniqueValue();
58+
59+
void add(SqlQuery analyzedQuery, @Nullable String queryText) {
60+
uniqueStoredProcedureName.set(analyzedQuery.getStoredProcedureName());
61+
uniqueQueryTexts.add(queryText);
62+
uniqueQuerySummary.set(analyzedQuery.getQuerySummary());
63+
}
64+
65+
MultiQuery build() {
66+
String querySummary = uniqueQuerySummary.getValue();
67+
return new MultiQuery(
68+
uniqueStoredProcedureName.getValue(),
69+
uniqueQueryTexts,
70+
querySummary == null ? "BATCH" : "BATCH " + querySummary);
71+
}
72+
}
73+
5974
private static class UniqueValue {
6075
@Nullable private String value;
6176
private boolean valid = true;

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,11 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST
109109
if (isBatch) {
110110
attributes.put(DB_OPERATION_BATCH_SIZE, batchSize);
111111
}
112-
boolean parameterizedQuery = getter.isParameterizedQuery(request);
113-
boolean shouldSanitize = querySanitizationEnabled && !parameterizedQuery;
114112
if (rawQueryTexts.size() == 1) {
115113
String rawQueryText = rawQueryTexts.iterator().next();
116114
SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect);
115+
boolean shouldSanitize =
116+
querySanitizationEnabled && !getter.isParameterizedQuery(request, 0);
117117
attributes.put(DB_QUERY_TEXT, shouldSanitize ? analyzedQuery.getQueryText() : rawQueryText);
118118
String querySummary = analyzedQuery.getQuerySummary();
119119
attributes.put(
@@ -125,9 +125,16 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST
125125
}
126126
attributes.put(DB_STORED_PROCEDURE_NAME, analyzedQuery.getStoredProcedureName());
127127
} else if (rawQueryTexts.size() > 1) {
128-
MultiQuery multiQuery =
129-
MultiQuery.analyzeWithSummary(
130-
getter.getRawQueryTexts(request), dialect, shouldSanitize);
128+
MultiQuery.Builder builder = MultiQuery.builder();
129+
int queryIndex = 0;
130+
for (String rawQueryText : rawQueryTexts) {
131+
SqlQuery analyzedQuery = SqlQueryAnalyzerUtil.analyzeWithSummary(rawQueryText, dialect);
132+
boolean shouldSanitize =
133+
querySanitizationEnabled && !getter.isParameterizedQuery(request, queryIndex);
134+
builder.add(analyzedQuery, shouldSanitize ? analyzedQuery.getQueryText() : rawQueryText);
135+
queryIndex++;
136+
}
137+
MultiQuery multiQuery = builder.build();
131138
attributes.put(DB_QUERY_TEXT, join("; ", multiQuery.getQueryTexts()));
132139
attributes.put(DB_QUERY_SUMMARY, multiQuery.getQuerySummary());
133140
attributes.put(DB_STORED_PROCEDURE_NAME, multiQuery.getStoredProcedureName());

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesGetter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,25 @@ default String getDbQuerySummary(REQUEST request) {
7171
* query does not need to be sanitized. See <a
7272
* href="https://github.com/open-telemetry/semantic-conventions/blob/main/docs/db/database-spans.md#sanitization-of-dbquerytext">sanitization
7373
* of db.query.text</a>.
74+
*
75+
* @deprecated use {@link #isParameterizedQuery(Object, int)} instead
7476
*/
75-
// TODO: make this required to implement
77+
@Deprecated
7678
default boolean isParameterizedQuery(REQUEST request) {
7779
return false;
7880
}
81+
82+
/**
83+
* Returns whether the query at {@code queryIndex} in {@link #getRawQueryTexts(Object)} is
84+
* parameterized.
85+
*
86+
* <p>The {@code queryIndex} is zero-based and follows the iteration order of {@link
87+
* #getRawQueryTexts(Object)}. This supports batch operations where individual entries may have
88+
* different parameterization.
89+
*/
90+
// TODO: make this required to implement
91+
@SuppressWarnings("deprecation")
92+
default boolean isParameterizedQuery(REQUEST request, int queryIndex) {
93+
return isParameterizedQuery(request);
94+
}
7995
}

instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/SqlClientAttributesExtractorTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
3737
import java.util.Collection;
3838
import java.util.HashMap;
39+
import java.util.List;
3940
import java.util.Map;
4041
import org.junit.jupiter.api.Test;
4142

@@ -108,6 +109,16 @@ static class TestMultiAttributesGetter extends TestAttributesGetter
108109
public Collection<String> getRawQueryTexts(Map<String, Object> map) {
109110
return (Collection<String>) map.get("db.query.texts");
110111
}
112+
113+
@SuppressWarnings("unchecked")
114+
@Override
115+
public boolean isParameterizedQuery(Map<String, Object> map, int queryIndex) {
116+
List<Boolean> parameterizedQueries = (List<Boolean>) map.get("db.query.parameterized");
117+
if (parameterizedQueries == null) {
118+
return super.isParameterizedQuery(map, queryIndex);
119+
}
120+
return parameterizedQueries.get(queryIndex);
121+
}
111122
}
112123

113124
@SuppressWarnings("deprecation") // TODO DB_CONNECTION_STRING deprecation
@@ -351,6 +362,56 @@ void shouldExtractMultiQueryBatchAttributes() {
351362
assertThat(endAttributes.build().isEmpty()).isTrue();
352363
}
353364

365+
@Test
366+
void shouldExtractMixedParameterizedMultiQueryBatchAttributes() {
367+
// given
368+
Map<String, Object> request = new HashMap<>();
369+
request.put("db.namespace", "potatoes");
370+
request.put(
371+
"db.query.texts",
372+
asList("INSERT INTO potato VALUES('alice', ?)", "UPDATE potato SET name='bob' WHERE id=1"));
373+
request.put("db.query.parameterized", asList(true, false));
374+
request.put(DB_OPERATION_BATCH_SIZE.getKey(), 2L);
375+
376+
Context context = Context.root();
377+
378+
AttributesExtractor<Map<String, Object>, Void> underTest =
379+
SqlClientAttributesExtractor.create(new TestMultiAttributesGetter());
380+
381+
// when
382+
AttributesBuilder startAttributes = Attributes.builder();
383+
underTest.onStart(startAttributes, context, request);
384+
385+
AttributesBuilder endAttributes = Attributes.builder();
386+
underTest.onEnd(endAttributes, context, request, null, null);
387+
388+
// then
389+
if (emitStableDatabaseSemconv() && emitOldDatabaseSemconv()) {
390+
assertThat(startAttributes.build())
391+
.containsOnly(
392+
entry(DB_NAME, "potatoes"),
393+
entry(DB_NAMESPACE, "potatoes"),
394+
entry(
395+
DB_QUERY_TEXT,
396+
"INSERT INTO potato VALUES('alice', ?); UPDATE potato SET name=? WHERE id=?"),
397+
entry(DB_QUERY_SUMMARY, "BATCH"),
398+
entry(DB_OPERATION_BATCH_SIZE, 2L));
399+
} else if (emitOldDatabaseSemconv()) {
400+
assertThat(startAttributes.build()).containsOnly(entry(DB_NAME, "potatoes"));
401+
} else if (emitStableDatabaseSemconv()) {
402+
assertThat(startAttributes.build())
403+
.containsOnly(
404+
entry(DB_NAMESPACE, "potatoes"),
405+
entry(
406+
DB_QUERY_TEXT,
407+
"INSERT INTO potato VALUES('alice', ?); UPDATE potato SET name=? WHERE id=?"),
408+
entry(DB_QUERY_SUMMARY, "BATCH"),
409+
entry(DB_OPERATION_BATCH_SIZE, 2L));
410+
}
411+
412+
assertThat(endAttributes.build().isEmpty()).isTrue();
413+
}
414+
354415
@Test
355416
void shouldIgnoreBatchSizeOne() {
356417
// given

instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraRequest.java

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,113 @@
55

66
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
77

8+
import static java.util.Collections.singleton;
9+
10+
import com.datastax.driver.core.BatchStatement;
11+
import com.datastax.driver.core.BoundStatement;
12+
import com.datastax.driver.core.RegularStatement;
813
import com.datastax.driver.core.Session;
14+
import com.datastax.driver.core.Statement;
915
import com.google.auto.value.AutoValue;
16+
import java.util.ArrayList;
17+
import java.util.Collection;
18+
import java.util.List;
19+
import javax.annotation.Nullable;
1020

1121
@AutoValue
1222
abstract class CassandraRequest {
1323

14-
public static CassandraRequest create(
15-
Session session, String queryText, boolean parameterizedQuery) {
16-
return new AutoValue_CassandraRequest(session, queryText, parameterizedQuery);
24+
static CassandraRequest create(Session session, String queryText, boolean parameterizedQuery) {
25+
return create(session, singleton(queryText), parameterizedQuery, null, null);
26+
}
27+
28+
static CassandraRequest create(Session session, String queryText) {
29+
return create(session, singleton(queryText), false, null, null);
30+
}
31+
32+
static CassandraRequest create(Session session, Statement statement) {
33+
if (statement instanceof BatchStatement) {
34+
return create(session, (BatchStatement) statement);
35+
}
36+
return create(
37+
session, singleton(getQuery(statement)), statement instanceof BoundStatement, null, null);
38+
}
39+
40+
private static CassandraRequest create(Session session, BatchStatement batchStatement) {
41+
List<String> queryTexts = new ArrayList<>();
42+
List<Boolean> mixedParameterizedQueries = null;
43+
boolean allQueriesParameterized = true;
44+
Boolean firstParameterizedQuery = null;
45+
int queryIndex = 0;
46+
for (Statement batchEntry : batchStatement.getStatements()) {
47+
queryTexts.add(getQuery(batchEntry));
48+
boolean parameterizedQuery = batchEntry instanceof BoundStatement;
49+
if (!parameterizedQuery) {
50+
allQueriesParameterized = false;
51+
}
52+
if (firstParameterizedQuery == null) {
53+
firstParameterizedQuery = parameterizedQuery;
54+
} else if (parameterizedQuery != firstParameterizedQuery
55+
&& mixedParameterizedQueries == null) {
56+
mixedParameterizedQueries = new ArrayList<>(batchStatement.size());
57+
for (int previousQueryIndex = 0; previousQueryIndex < queryIndex; previousQueryIndex++) {
58+
mixedParameterizedQueries.add(firstParameterizedQuery);
59+
}
60+
}
61+
if (mixedParameterizedQueries != null) {
62+
mixedParameterizedQueries.add(parameterizedQuery);
63+
}
64+
queryIndex++;
65+
}
66+
boolean allQueriesParameterizedResult = allQueriesParameterized;
67+
if (mixedParameterizedQueries == null && firstParameterizedQuery != null) {
68+
allQueriesParameterizedResult = firstParameterizedQuery;
69+
}
70+
return create(
71+
session,
72+
queryTexts,
73+
allQueriesParameterizedResult,
74+
mixedParameterizedQueries,
75+
Long.valueOf(batchStatement.size()));
76+
}
77+
78+
private static CassandraRequest create(
79+
Session session,
80+
Collection<String> queryTexts,
81+
boolean allQueriesParameterized,
82+
@Nullable List<Boolean> mixedParameterizedQueries,
83+
@Nullable Long batchSize) {
84+
return new AutoValue_CassandraRequest(
85+
session, queryTexts, allQueriesParameterized, mixedParameterizedQueries, batchSize);
86+
}
87+
88+
private static String getQuery(Statement statement) {
89+
String query = null;
90+
if (statement instanceof BoundStatement) {
91+
query = ((BoundStatement) statement).preparedStatement().getQueryString();
92+
} else if (statement instanceof RegularStatement) {
93+
query = ((RegularStatement) statement).getQueryString();
94+
}
95+
96+
return query == null ? "" : query;
1797
}
1898

19-
public abstract Session getSession();
99+
abstract Session getSession();
20100

21-
public abstract String getQueryText();
101+
abstract Collection<String> getQueryTexts();
102+
103+
abstract boolean allQueriesParameterized();
104+
105+
@Nullable
106+
abstract List<Boolean> mixedParameterizedQueries();
107+
108+
boolean isParameterizedQuery(int queryIndex) {
109+
List<Boolean> mixedParameterizedQueries = mixedParameterizedQueries();
110+
return mixedParameterizedQueries == null
111+
? allQueriesParameterized()
112+
: mixedParameterizedQueries.get(queryIndex);
113+
}
22114

23-
public abstract boolean isParameterizedQuery();
115+
@Nullable
116+
abstract Long getBatchSize();
24117
}

instrumentation/cassandra/cassandra-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/cassandra/v3_0/CassandraSqlAttributesGetter.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;
77

88
import static io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlDialect.DOUBLE_QUOTES_ARE_IDENTIFIERS;
9-
import static java.util.Collections.singleton;
109

1110
import com.datastax.driver.core.ExecutionInfo;
1211
import io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlClientAttributesGetter;
@@ -39,7 +38,13 @@ public String getDbNamespace(CassandraRequest request) {
3938

4039
@Override
4140
public Collection<String> getRawQueryTexts(CassandraRequest request) {
42-
return singleton(request.getQueryText());
41+
return request.getQueryTexts();
42+
}
43+
44+
@Override
45+
@Nullable
46+
public Long getDbOperationBatchSize(CassandraRequest request) {
47+
return request.getBatchSize();
4348
}
4449

4550
@Nullable
@@ -50,7 +55,7 @@ public InetSocketAddress getNetworkPeerInetSocketAddress(
5055
}
5156

5257
@Override
53-
public boolean isParameterizedQuery(CassandraRequest request) {
54-
return request.isParameterizedQuery();
58+
public boolean isParameterizedQuery(CassandraRequest request, int queryIndex) {
59+
return request.isParameterizedQuery(queryIndex);
5560
}
5661
}

0 commit comments

Comments
 (0)