From 5554cd912976402f1955ebe609593ec95ab4f793 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Mon, 15 Jun 2026 15:04:20 -0700 Subject: [PATCH] Add InfluxDB batch size attribute --- .../v2_4/InfluxDbAttributesGetter.java | 12 ++- .../v2_4/InfluxDbImplInstrumentation.java | 95 ++++++++++++++++--- .../influxdb/v2_4/InfluxDbOperation.java | 14 ++- .../influxdb/v2_4/InfluxDbClientTest.java | 34 +++++-- .../influxdb/v2_4/InfluxDbClient24Test.java | 9 +- 5 files changed, 141 insertions(+), 23 deletions(-) diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbAttributesGetter.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbAttributesGetter.java index 4236ffe08adc..f5a3ff02daa5 100644 --- a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbAttributesGetter.java +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbAttributesGetter.java @@ -13,7 +13,17 @@ final class InfluxDbAttributesGetter implements DbClientAttributesGetter onEnter( + @Advice.Argument(0) Object arg0, + @Advice.AllArguments Object[] arguments, + @Advice.FieldValue(value = "retrofit") Retrofit retrofit) { + CallDepth callDepth = CallDepth.forClass(InfluxDBImpl.class); + if (callDepth.getAndIncrement() > 0) { + return null; + } + + if (arg0 == null) { + return null; + } + + Context parentContext = currentContext(); + + HttpUrl httpUrl = retrofit.baseUrl(); + InfluxDbOperation influxDbOperation = + InfluxDbOperation.create( + httpUrl.host(), + httpUrl.port(), + getDatabase(arg0), + "write", + getOperationBatchSize(arguments)); + + if (!requestInstrumenter().shouldStart(parentContext, influxDbOperation)) { + return null; + } + + return InfluxDbScope.start(requestInstrumenter(), parentContext, influxDbOperation); + } + + @Nullable + public static String getDatabase(Object arg0) { + if (arg0 instanceof BatchPoints) { + return ((BatchPoints) arg0).getDatabase(); + } + // write data by UDP protocol, in this way, can't get database name. + if (arg0 instanceof Integer) { + return null; + } + return String.valueOf(arg0); + } + + @Nullable + public static Long getOperationBatchSize(Object[] arguments) { + if (arguments.length == 0) { + return null; + } + + Object pointsOrRecords = arguments[arguments.length - 1]; + + int batchSize; + if (pointsOrRecords instanceof BatchPoints) { + batchSize = ((BatchPoints) pointsOrRecords).getPoints().size(); + } else if (pointsOrRecords instanceof List) { + batchSize = ((List) pointsOrRecords).size(); + } else { + return null; + } + return batchSize > 1 ? (long) batchSize : null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false) + public static void onExit( + @Advice.Thrown @Nullable Throwable throwable, + @Advice.Enter @Nullable InfluxDbScope scope) { + CallDepth callDepth = CallDepth.forClass(InfluxDBImpl.class); + if (callDepth.decrementAndGet() > 0 || scope == null) { + return; + } + + scope.end(throwable); + } + } + @SuppressWarnings("unused") public static class InfluxDbModifyAdvice { @@ -136,25 +216,18 @@ public static InfluxDbScope onEnter( Context parentContext = currentContext(); HttpUrl httpUrl = retrofit.baseUrl(); - String database = - (arg0 instanceof BatchPoints) - ? ((BatchPoints) arg0).getDatabase() - // write data by UDP protocol, in this way, can't get database name. - : arg0 instanceof Integer ? null : String.valueOf(arg0); - String operationName; if ("createDatabase".equals(methodName)) { // createDatabase emits a CREATE DATABASE query. operationName = "CREATE DATABASE"; - } else if ("deleteDatabase".equals(methodName)) { + } else { // deleteDatabase emits a DROP DATABASE query. operationName = "DROP DATABASE"; - } else { - operationName = methodName; } InfluxDbOperation influxDbOperation = - InfluxDbOperation.create(httpUrl.host(), httpUrl.port(), database, operationName); + InfluxDbOperation.create( + httpUrl.host(), httpUrl.port(), String.valueOf(arg0), operationName); if (!requestInstrumenter().shouldStart(parentContext, influxDbOperation)) { return null; diff --git a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbOperation.java b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbOperation.java index 9a989c247aa5..d650f6819894 100644 --- a/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbOperation.java +++ b/instrumentation/influxdb-2.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbOperation.java @@ -13,7 +13,16 @@ public abstract class InfluxDbOperation { public static InfluxDbOperation create( String host, int port, @Nullable String namespace, @Nullable String operation) { - return new AutoValue_InfluxDbOperation(host, port, namespace, operation); + return create(host, port, namespace, operation, null); + } + + public static InfluxDbOperation create( + String host, + int port, + @Nullable String namespace, + @Nullable String operation, + @Nullable Long operationBatchSize) { + return new AutoValue_InfluxDbOperation(host, port, namespace, operation, operationBatchSize); } public abstract String getHost(); @@ -25,4 +34,7 @@ public static InfluxDbOperation create( @Nullable public abstract String getOperation(); + + @Nullable + public abstract Long getOperationBatchSize(); } diff --git a/instrumentation/influxdb-2.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClientTest.java b/instrumentation/influxdb-2.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClientTest.java index bec54faddc6f..ebcf6e2f5dce 100644 --- a/instrumentation/influxdb-2.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClientTest.java +++ b/instrumentation/influxdb-2.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClientTest.java @@ -9,6 +9,7 @@ import static io.opentelemetry.instrumentation.testing.junit.db.DbClientMetricsTestUtil.assertDurationMetric; import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_BATCH_SIZE; import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_NAME; @@ -124,7 +125,9 @@ void testQueryAndModifyWithOneArgument() { trace.hasSpansSatisfyingExactly( span -> span.hasName( - emitStableDatabaseSemconv() ? "write " + dbName : "WRITE " + dbName) + emitStableDatabaseSemconv() + ? "BATCH write " + dbName + : "WRITE " + dbName) .hasKind(SpanKind.CLIENT) .hasAttributesSatisfyingExactly( equalTo(maybeStable(DB_SYSTEM), INFLUXDB), @@ -133,7 +136,9 @@ void testQueryAndModifyWithOneArgument() { equalTo(SERVER_PORT, port), equalTo( maybeStable(DB_OPERATION), - emitStableDatabaseSemconv() ? "write" : "WRITE"))), + emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"), + equalTo( + DB_OPERATION_BATCH_SIZE, emitStableDatabaseSemconv() ? 2L : null))), trace -> trace.hasSpansSatisfyingExactly( span -> @@ -367,6 +372,7 @@ void testWriteWithFourArguments() { String measurement = "cpu_load"; List records = new ArrayList<>(); records.add(measurement + ",atag=test1 idle=100,usertime=10,system=1 1485273600"); + records.add(measurement + ",atag=test2 idle=200,usertime=20,system=2 1485273601"); influxDb.write(DATABASE_NAME, "autogen", InfluxDB.ConsistencyLevel.ONE, records); testing.waitAndAssertTraces( @@ -375,7 +381,7 @@ void testWriteWithFourArguments() { span -> span.hasName( emitStableDatabaseSemconv() - ? "write " + DATABASE_NAME + ? "BATCH write " + DATABASE_NAME : "WRITE " + DATABASE_NAME) .hasKind(SpanKind.CLIENT) .hasAttributesSatisfyingExactly( @@ -385,7 +391,10 @@ void testWriteWithFourArguments() { equalTo(SERVER_PORT, port), equalTo( maybeStable(DB_OPERATION), - emitStableDatabaseSemconv() ? "write" : "WRITE")))); + emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"), + equalTo( + DB_OPERATION_BATCH_SIZE, + emitStableDatabaseSemconv() ? 2L : null)))); } @Test @@ -393,6 +402,7 @@ void testWriteWithFiveArguments() { String measurement = "cpu_load"; List records = new ArrayList<>(); records.add(measurement + ",atag=test1 idle=100,usertime=10,system=1 1485273600"); + records.add(measurement + ",atag=test2 idle=200,usertime=20,system=2 1485273601"); influxDb.write(DATABASE_NAME, "autogen", InfluxDB.ConsistencyLevel.ONE, SECONDS, records); testing.waitAndAssertTraces( @@ -401,7 +411,7 @@ void testWriteWithFiveArguments() { span -> span.hasName( emitStableDatabaseSemconv() - ? "write " + DATABASE_NAME + ? "BATCH write " + DATABASE_NAME : "WRITE " + DATABASE_NAME) .hasKind(SpanKind.CLIENT) .hasAttributesSatisfyingExactly( @@ -411,7 +421,10 @@ void testWriteWithFiveArguments() { equalTo(SERVER_PORT, port), equalTo( maybeStable(DB_OPERATION), - emitStableDatabaseSemconv() ? "write" : "WRITE")))); + emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"), + equalTo( + DB_OPERATION_BATCH_SIZE, + emitStableDatabaseSemconv() ? 2L : null)))); } @Test @@ -428,7 +441,9 @@ void testWriteWithUdp() { trace.hasSpansSatisfyingExactly( span -> span.hasName( - emitStableDatabaseSemconv() ? "write " + host + ":" + port : "WRITE") + emitStableDatabaseSemconv() + ? "BATCH write " + host + ":" + port + : "WRITE") .hasKind(SpanKind.CLIENT) .hasAttributesSatisfyingExactly( equalTo(maybeStable(DB_SYSTEM), INFLUXDB), @@ -437,6 +452,9 @@ void testWriteWithUdp() { equalTo(SERVER_PORT, port), equalTo( maybeStable(DB_OPERATION), - emitStableDatabaseSemconv() ? "write" : "WRITE")))); + emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"), + equalTo( + DB_OPERATION_BATCH_SIZE, + emitStableDatabaseSemconv() ? 2000L : null)))); } } diff --git a/instrumentation/influxdb-2.4/javaagent/src/test24/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClient24Test.java b/instrumentation/influxdb-2.4/javaagent/src/test24/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClient24Test.java index ea81c8caa761..417ee971e088 100644 --- a/instrumentation/influxdb-2.4/javaagent/src/test24/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClient24Test.java +++ b/instrumentation/influxdb-2.4/javaagent/src/test24/java/io/opentelemetry/javaagent/instrumentation/influxdb/v2_4/InfluxDbClient24Test.java @@ -8,6 +8,7 @@ import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv; import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_BATCH_SIZE; import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_NAME; @@ -114,7 +115,9 @@ void testQueryAndModifyWithOneArgument() { trace.hasSpansSatisfyingExactly( span -> span.hasName( - emitStableDatabaseSemconv() ? "write " + dbName : "WRITE " + dbName) + emitStableDatabaseSemconv() + ? "BATCH write " + dbName + : "WRITE " + dbName) .hasKind(SpanKind.CLIENT) .hasAttributesSatisfyingExactly( equalTo(maybeStable(DB_SYSTEM), INFLUXDB), @@ -123,7 +126,9 @@ void testQueryAndModifyWithOneArgument() { equalTo(SERVER_PORT, port), equalTo( maybeStable(DB_OPERATION), - emitStableDatabaseSemconv() ? "write" : "WRITE"))), + emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"), + equalTo( + DB_OPERATION_BATCH_SIZE, emitStableDatabaseSemconv() ? 2L : null))), trace -> trace.hasSpansSatisfyingExactly( span ->