Skip to content

Commit 5a47cc3

Browse files
authored
Add R2DBC batch size semconv attributes (#18980)
1 parent d0be67f commit 5a47cc3

5 files changed

Lines changed: 209 additions & 9 deletions

File tree

instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/DbExecution.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@
1111
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
1212
import static io.r2dbc.spi.ConnectionFactoryOptions.PROTOCOL;
1313
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
14-
import static java.util.stream.Collectors.joining;
14+
import static java.util.stream.Collectors.toList;
1515

1616
import io.opentelemetry.context.Context;
1717
import io.r2dbc.proxy.core.QueryExecutionInfo;
1818
import io.r2dbc.proxy.core.QueryInfo;
1919
import io.r2dbc.spi.Connection;
2020
import io.r2dbc.spi.ConnectionFactoryOptions;
2121
import java.util.HashMap;
22+
import java.util.List;
2223
import java.util.Locale;
2324
import java.util.Map;
2425
import javax.annotation.Nullable;
@@ -64,7 +65,8 @@ private static Map<String, String> buildDriverToSystemName() {
6465
@Nullable private final String serverAddress;
6566
@Nullable private final Integer serverPort;
6667
private final String connectionString;
67-
private final String rawQueryText;
68+
private final List<String> rawQueryTexts;
69+
@Nullable private final Long batchSize;
6870
private final boolean parameterizedQuery;
6971

7072
@Nullable private Context context;
@@ -100,13 +102,15 @@ public DbExecution(QueryExecutionInfo queryInfo, ConnectionFactoryOptions factor
100102
protocol != null ? ":" + protocol : "",
101103
serverAddress != null ? "//" + serverAddress : "",
102104
serverPort != null ? ":" + serverPort : "");
103-
this.rawQueryText =
105+
this.rawQueryTexts =
104106
queryInfo.getQueries().stream()
105107
.map(QueryInfo::getQuery)
106108
.map(
107109
query ->
108110
R2dbcSqlCommenterUtil.getOriginalQuery(queryInfo.getConnectionInfo(), query))
109-
.collect(joining(";\n"));
111+
.collect(toList());
112+
int queryInfoBatchSize = queryInfo.getBatchSize();
113+
this.batchSize = queryInfoBatchSize > 1 ? (long) queryInfoBatchSize : null;
110114
this.parameterizedQuery =
111115
queryInfo.getQueries().stream()
112116
.anyMatch(queryInfo1 -> !queryInfo1.getBindingsList().isEmpty());
@@ -146,8 +150,13 @@ public String getConnectionString() {
146150
return connectionString;
147151
}
148152

149-
public String getRawQueryText() {
150-
return rawQueryText;
153+
public List<String> getRawQueryTexts() {
154+
return rawQueryTexts;
155+
}
156+
157+
@Nullable
158+
public Long getBatchSize() {
159+
return batchSize;
151160
}
152161

153162
public boolean isParameterizedQuery() {

instrumentation/r2dbc-1.0/library/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/internal/R2dbcSqlAttributesGetter.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.opentelemetry.instrumentation.r2dbc.v1_0.internal;
77

88
import static io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlDialect.DOUBLE_QUOTES_ARE_STRING_LITERALS;
9+
import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv;
910
import static java.util.Collections.singleton;
1011

1112
import io.opentelemetry.instrumentation.api.incubator.semconv.db.SqlClientAttributesGetter;
@@ -64,7 +65,33 @@ public String getConnectionString(DbExecution request) {
6465

6566
@Override
6667
public Collection<String> getRawQueryTexts(DbExecution request) {
67-
return singleton(request.getRawQueryText());
68+
Collection<String> rawQueryTexts = request.getRawQueryTexts();
69+
// In old-only mode, join multi-query batches into a single query to preserve the legacy
70+
// db.statement and db.operation extraction behavior. In database/dup mode, favor stable
71+
// multi-query batch attributes because the shared SQL extractor can only use one raw query
72+
// collection.
73+
return emitStableDatabaseSemconv() || rawQueryTexts.size() == 1
74+
? rawQueryTexts
75+
: singleton(join(";\n", rawQueryTexts));
76+
}
77+
78+
private static String join(String delimiter, Collection<String> collection) {
79+
StringBuilder builder = new StringBuilder();
80+
for (String string : collection) {
81+
if (builder.length() != 0) {
82+
builder.append(delimiter);
83+
}
84+
builder.append(string);
85+
}
86+
return builder.toString();
87+
}
88+
89+
@Override
90+
@Nullable
91+
public Long getDbOperationBatchSize(DbExecution request) {
92+
// Batch size is a stable database semconv signal. Keep it hidden from old-only mode so legacy
93+
// extraction does not start treating existing requests as batches.
94+
return emitStableDatabaseSemconv() ? request.getBatchSize() : null;
6895
}
6996

7097
@Nullable

instrumentation/r2dbc-1.0/library/src/test/java/io/opentelemetry/instrumentation/r2dbc/v1_0/DbExecutionTest.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,45 @@ void dbExecution() {
4949
assertThat(dbExecution.getServerAddress()).isEqualTo("localhost");
5050
assertThat(dbExecution.getServerPort()).isEqualTo(3306);
5151
assertThat(dbExecution.getConnectionString()).isEqualTo("mariadb://localhost:3306");
52-
assertThat(dbExecution.getRawQueryText())
53-
.isEqualTo("SELECT * from person where last_name = 'tom'");
52+
assertThat(dbExecution.getRawQueryTexts())
53+
.containsExactly("SELECT * from person where last_name = 'tom'");
54+
assertThat(dbExecution.getBatchSize()).isNull();
55+
}
56+
57+
@Test
58+
void dbExecutionWithBatch() {
59+
QueryExecutionInfo queryExecutionInfo =
60+
MockQueryExecutionInfo.builder()
61+
.queryInfo(new QueryInfo("INSERT INTO person VALUES(1)"))
62+
.queryInfo(new QueryInfo("INSERT INTO person VALUES(2)"))
63+
.batchSize(2)
64+
.connectionInfo(MockConnectionInfo.builder().build())
65+
.build();
66+
ConnectionFactoryOptions factoryOptions =
67+
ConnectionFactoryOptions.parse("r2dbc:postgresql://localhost/db");
68+
69+
DbExecution dbExecution = new DbExecution(queryExecutionInfo, factoryOptions);
70+
71+
assertThat(dbExecution.getRawQueryTexts())
72+
.containsExactly("INSERT INTO person VALUES(1)", "INSERT INTO person VALUES(2)");
73+
assertThat(dbExecution.getBatchSize()).isEqualTo(2);
74+
}
75+
76+
@Test
77+
void dbExecutionWithBatchSizeOne() {
78+
QueryExecutionInfo queryExecutionInfo =
79+
MockQueryExecutionInfo.builder()
80+
.queryInfo(new QueryInfo("INSERT INTO person VALUES(1)"))
81+
.batchSize(1)
82+
.connectionInfo(MockConnectionInfo.builder().build())
83+
.build();
84+
ConnectionFactoryOptions factoryOptions =
85+
ConnectionFactoryOptions.parse("r2dbc:postgresql://localhost/db");
86+
87+
DbExecution dbExecution = new DbExecution(queryExecutionInfo, factoryOptions);
88+
89+
assertThat(dbExecution.getRawQueryTexts()).containsExactly("INSERT INTO person VALUES(1)");
90+
assertThat(dbExecution.getBatchSize()).isNull();
5491
}
5592

5693
@SuppressWarnings("deprecation") // testing deprecated semconv
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.r2dbc.v1_0;
7+
8+
import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv;
9+
import static org.assertj.core.api.Assertions.assertThat;
10+
11+
import io.opentelemetry.instrumentation.r2dbc.v1_0.internal.DbExecution;
12+
import io.opentelemetry.instrumentation.r2dbc.v1_0.internal.R2dbcSqlAttributesGetter;
13+
import io.r2dbc.proxy.core.QueryExecutionInfo;
14+
import io.r2dbc.proxy.core.QueryInfo;
15+
import io.r2dbc.proxy.test.MockConnectionInfo;
16+
import io.r2dbc.proxy.test.MockQueryExecutionInfo;
17+
import io.r2dbc.spi.ConnectionFactoryOptions;
18+
import java.util.Collection;
19+
import org.junit.jupiter.api.Test;
20+
21+
class R2dbcSqlAttributesGetterTest {
22+
23+
private final R2dbcSqlAttributesGetter getter = new R2dbcSqlAttributesGetter();
24+
25+
@Test
26+
void rawQueryTextsForSingleQuery() {
27+
QueryExecutionInfo queryExecutionInfo =
28+
MockQueryExecutionInfo.builder()
29+
.queryInfo(new QueryInfo("INSERT INTO person VALUES(1)"))
30+
.connectionInfo(MockConnectionInfo.builder().build())
31+
.build();
32+
ConnectionFactoryOptions factoryOptions =
33+
ConnectionFactoryOptions.parse("r2dbc:postgresql://localhost/db");
34+
DbExecution dbExecution = new DbExecution(queryExecutionInfo, factoryOptions);
35+
36+
Collection<String> rawQueryTexts = getter.getRawQueryTexts(dbExecution);
37+
38+
assertThat(rawQueryTexts).isSameAs(dbExecution.getRawQueryTexts());
39+
assertThat(rawQueryTexts).containsExactly("INSERT INTO person VALUES(1)");
40+
}
41+
42+
@Test
43+
void rawQueryTextsForBatch() {
44+
QueryExecutionInfo queryExecutionInfo =
45+
MockQueryExecutionInfo.builder()
46+
.queryInfo(new QueryInfo("INSERT INTO person VALUES(1)"))
47+
.queryInfo(new QueryInfo("INSERT INTO person VALUES(2)"))
48+
.batchSize(2)
49+
.connectionInfo(MockConnectionInfo.builder().build())
50+
.build();
51+
ConnectionFactoryOptions factoryOptions =
52+
ConnectionFactoryOptions.parse("r2dbc:postgresql://localhost/db");
53+
DbExecution dbExecution = new DbExecution(queryExecutionInfo, factoryOptions);
54+
55+
Collection<String> rawQueryTexts = getter.getRawQueryTexts(dbExecution);
56+
57+
if (emitStableDatabaseSemconv()) {
58+
assertThat(rawQueryTexts)
59+
.containsExactly("INSERT INTO person VALUES(1)", "INSERT INTO person VALUES(2)");
60+
assertThat(getter.getDbOperationBatchSize(dbExecution)).isEqualTo(2);
61+
} else {
62+
assertThat(rawQueryTexts)
63+
.containsExactly("INSERT INTO person VALUES(1);\nINSERT INTO person VALUES(2)");
64+
assertThat(getter.getDbOperationBatchSize(dbExecution)).isNull();
65+
}
66+
}
67+
}

instrumentation/r2dbc-1.0/testing/src/main/java/io/opentelemetry/instrumentation/r2dbc/v1_0/AbstractR2dbcStatementTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import static io.opentelemetry.instrumentation.testing.junit.service.SemconvServiceStabilityUtil.maybeStablePeerService;
1212
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
1313
import static io.opentelemetry.semconv.DbAttributes.DB_NAMESPACE;
14+
import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_BATCH_SIZE;
1415
import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_NAME;
1516
import static io.opentelemetry.semconv.DbAttributes.DB_QUERY_SUMMARY;
17+
import static io.opentelemetry.semconv.DbAttributes.DB_QUERY_TEXT;
1618
import static io.opentelemetry.semconv.DbAttributes.DB_SYSTEM_NAME;
1719
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
1820
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
@@ -30,6 +32,7 @@
3032
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
3133
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
3234
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
35+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
3336
import static org.junit.jupiter.api.Named.named;
3437

3538
import com.google.errorprone.annotations.CanIgnoreReturnValue;
@@ -54,6 +57,7 @@
5457
import org.testcontainers.containers.GenericContainer;
5558
import org.testcontainers.containers.output.Slf4jLogConsumer;
5659
import org.testcontainers.containers.wait.strategy.Wait;
60+
import reactor.core.publisher.Flux;
5761
import reactor.core.publisher.Mono;
5862

5963
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@@ -294,6 +298,62 @@ void testMetrics() {
294298
SERVER_PORT);
295299
}
296300

301+
@Test
302+
void testBatchQueries() {
303+
assumeTrue(emitStableDatabaseSemconv());
304+
305+
DbSystemProps props = systems.get(MARIADB.system);
306+
startContainer(props);
307+
ConnectionFactory connectionFactory =
308+
createProxyConnectionFactory(
309+
ConnectionFactoryOptions.builder()
310+
.option(DRIVER, props.system)
311+
.option(HOST, container.getHost())
312+
.option(PORT, port)
313+
.option(USER, USER_DB)
314+
.option(PASSWORD, PW_DB)
315+
.option(DATABASE, DB)
316+
.option(CONNECT_TIMEOUT, Duration.ofSeconds(30))
317+
.build());
318+
319+
getTesting()
320+
.runWithSpan(
321+
"parent",
322+
() -> {
323+
Mono.from(connectionFactory.create())
324+
.flatMapMany(
325+
connection ->
326+
Flux.from(
327+
connection
328+
.createBatch()
329+
.add("SELECT 1")
330+
.add("SELECT 2")
331+
.execute())
332+
.flatMap(result -> result.map((row, metadata) -> ""))
333+
.concatWith(Mono.from(connection.close()).cast(String.class)))
334+
.blockLast(Duration.ofMinutes(1));
335+
});
336+
337+
getTesting()
338+
.waitAndAssertTraces(
339+
trace ->
340+
trace.hasSpansSatisfyingExactly(
341+
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
342+
span ->
343+
span.hasName("BATCH SELECT")
344+
.hasKind(SpanKind.CLIENT)
345+
.hasParent(trace.getSpan(0))
346+
.hasAttributesSatisfyingExactly(
347+
equalTo(DB_SYSTEM_NAME, MARIADB.system),
348+
equalTo(DB_NAMESPACE, DB),
349+
equalTo(DB_QUERY_TEXT, "SELECT ?"),
350+
equalTo(DB_QUERY_SUMMARY, "BATCH SELECT"),
351+
equalTo(DB_OPERATION_BATCH_SIZE, 2),
352+
equalTo(maybeStablePeerService(), "test-peer-service"),
353+
equalTo(SERVER_ADDRESS, container.getHost()),
354+
equalTo(SERVER_PORT, port))));
355+
}
356+
297357
private static class Parameter {
298358

299359
private final String system;

0 commit comments

Comments
 (0)