Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@ final class InfluxDbAttributesGetter implements DbClientAttributesGetter<InfluxD
@Nullable
@Override
public String getDbOperationName(InfluxDbOperation request) {
return request.getOperation();
String operation = request.getOperation();
if (operation == null) {
return null;
}
return request.getOperationBatchSize() == null ? operation : "BATCH " + operation;
}

@Nullable
@Override
public Long getDbOperationBatchSize(InfluxDbOperation request) {
return request.getOperationBatchSize();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.opentelemetry.javaagent.bootstrap.CallDepth;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.List;
import javax.annotation.Nullable;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -59,7 +60,7 @@ public void transform(TypeTransformer transformer) {
.and(takesArgument(1, String.class))
.and(takesArgument(2, isEnum()))
.and(takesArgument(3, named("java.util.concurrent.TimeUnit"))))),
getClass().getName() + "$InfluxDbModifyAdvice");
getClass().getName() + "$InfluxDbWriteAdvice");
transformer.applyAdviceToMethod(
namedOneOf("createDatabase", "deleteDatabase"),
getClass().getName() + "$InfluxDbModifyAdvice");
Expand Down Expand Up @@ -116,6 +117,85 @@ public static void onExit(
}
}

@SuppressWarnings("unused")
public static class InfluxDbWriteAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
public static InfluxDbScope<InfluxDbOperation> onEnter(
@Advice.Argument(0) Object arg0,
@Advice.AllArguments Object[] arguments,
@Advice.FieldValue(value = "retrofit") Retrofit retrofit) {
CallDepth callDepth = CallDepth.forClass(InfluxDBImpl.class);
if (callDepth.getAndIncrement() > 0) {
return null;
}

if (arg0 == null) {
return null;
}

Context parentContext = currentContext();

HttpUrl httpUrl = retrofit.baseUrl();
InfluxDbOperation influxDbOperation =
InfluxDbOperation.create(
httpUrl.host(),
httpUrl.port(),
getDatabase(arg0),
"write",
getOperationBatchSize(arguments));

if (!requestInstrumenter().shouldStart(parentContext, influxDbOperation)) {
return null;
}

return InfluxDbScope.start(requestInstrumenter(), parentContext, influxDbOperation);
}

@Nullable
public static String getDatabase(Object arg0) {
if (arg0 instanceof BatchPoints) {
return ((BatchPoints) arg0).getDatabase();
}
// write data by UDP protocol, in this way, can't get database name.
if (arg0 instanceof Integer) {
return null;
}
return String.valueOf(arg0);
}

@Nullable
public static Long getOperationBatchSize(Object[] arguments) {
if (arguments.length == 0) {
return null;
}

Object pointsOrRecords = arguments[arguments.length - 1];

int batchSize;
if (pointsOrRecords instanceof BatchPoints) {
batchSize = ((BatchPoints) pointsOrRecords).getPoints().size();
} else if (pointsOrRecords instanceof List) {
batchSize = ((List<?>) pointsOrRecords).size();
} else {
return null;
}
return batchSize > 1 ? (long) batchSize : null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
public static void onExit(
@Advice.Thrown @Nullable Throwable throwable,
@Advice.Enter @Nullable InfluxDbScope<InfluxDbOperation> scope) {
CallDepth callDepth = CallDepth.forClass(InfluxDBImpl.class);
if (callDepth.decrementAndGet() > 0 || scope == null) {
return;
}

scope.end(throwable);
}
}

@SuppressWarnings("unused")
public static class InfluxDbModifyAdvice {

Expand All @@ -136,25 +216,18 @@ public static InfluxDbScope<InfluxDbOperation> onEnter(
Context parentContext = currentContext();

HttpUrl httpUrl = retrofit.baseUrl();
String database =
(arg0 instanceof BatchPoints)
? ((BatchPoints) arg0).getDatabase()
// write data by UDP protocol, in this way, can't get database name.
: arg0 instanceof Integer ? null : String.valueOf(arg0);

String operationName;
if ("createDatabase".equals(methodName)) {
// createDatabase emits a CREATE DATABASE query.
operationName = "CREATE DATABASE";
} else if ("deleteDatabase".equals(methodName)) {
} else {
// deleteDatabase emits a DROP DATABASE query.
operationName = "DROP DATABASE";
} else {
operationName = methodName;
}

InfluxDbOperation influxDbOperation =
InfluxDbOperation.create(httpUrl.host(), httpUrl.port(), database, operationName);
InfluxDbOperation.create(
httpUrl.host(), httpUrl.port(), String.valueOf(arg0), operationName);

if (!requestInstrumenter().shouldStart(parentContext, influxDbOperation)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,16 @@ public abstract class InfluxDbOperation {

public static InfluxDbOperation create(
String host, int port, @Nullable String namespace, @Nullable String operation) {
return new AutoValue_InfluxDbOperation(host, port, namespace, operation);
return create(host, port, namespace, operation, null);
}

public static InfluxDbOperation create(
String host,
int port,
@Nullable String namespace,
@Nullable String operation,
@Nullable Long operationBatchSize) {
return new AutoValue_InfluxDbOperation(host, port, namespace, operation, operationBatchSize);
}

public abstract String getHost();
Expand All @@ -25,4 +34,7 @@ public static InfluxDbOperation create(

@Nullable
public abstract String getOperation();

@Nullable
public abstract Long getOperationBatchSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.opentelemetry.instrumentation.testing.junit.db.DbClientMetricsTestUtil.assertDurationMetric;
import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_BATCH_SIZE;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_NAME;
Expand Down Expand Up @@ -124,7 +125,9 @@ void testQueryAndModifyWithOneArgument() {
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(
emitStableDatabaseSemconv() ? "write " + dbName : "WRITE " + dbName)
emitStableDatabaseSemconv()
? "BATCH write " + dbName
: "WRITE " + dbName)
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(maybeStable(DB_SYSTEM), INFLUXDB),
Expand All @@ -133,7 +136,9 @@ void testQueryAndModifyWithOneArgument() {
equalTo(SERVER_PORT, port),
equalTo(
maybeStable(DB_OPERATION),
emitStableDatabaseSemconv() ? "write" : "WRITE"))),
emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"),
equalTo(
DB_OPERATION_BATCH_SIZE, emitStableDatabaseSemconv() ? 2L : null))),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
Expand Down Expand Up @@ -367,6 +372,7 @@ void testWriteWithFourArguments() {
String measurement = "cpu_load";
List<String> records = new ArrayList<>();
records.add(measurement + ",atag=test1 idle=100,usertime=10,system=1 1485273600");
records.add(measurement + ",atag=test2 idle=200,usertime=20,system=2 1485273601");
influxDb.write(DATABASE_NAME, "autogen", InfluxDB.ConsistencyLevel.ONE, records);

testing.waitAndAssertTraces(
Expand All @@ -375,7 +381,7 @@ void testWriteWithFourArguments() {
span ->
span.hasName(
emitStableDatabaseSemconv()
? "write " + DATABASE_NAME
? "BATCH write " + DATABASE_NAME
: "WRITE " + DATABASE_NAME)
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
Expand All @@ -385,14 +391,18 @@ void testWriteWithFourArguments() {
equalTo(SERVER_PORT, port),
equalTo(
maybeStable(DB_OPERATION),
emitStableDatabaseSemconv() ? "write" : "WRITE"))));
emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"),
equalTo(
DB_OPERATION_BATCH_SIZE,
emitStableDatabaseSemconv() ? 2L : null))));
}

@Test
void testWriteWithFiveArguments() {
String measurement = "cpu_load";
List<String> records = new ArrayList<>();
records.add(measurement + ",atag=test1 idle=100,usertime=10,system=1 1485273600");
records.add(measurement + ",atag=test2 idle=200,usertime=20,system=2 1485273601");
influxDb.write(DATABASE_NAME, "autogen", InfluxDB.ConsistencyLevel.ONE, SECONDS, records);

testing.waitAndAssertTraces(
Expand All @@ -401,7 +411,7 @@ void testWriteWithFiveArguments() {
span ->
span.hasName(
emitStableDatabaseSemconv()
? "write " + DATABASE_NAME
? "BATCH write " + DATABASE_NAME
: "WRITE " + DATABASE_NAME)
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
Expand All @@ -411,7 +421,10 @@ void testWriteWithFiveArguments() {
equalTo(SERVER_PORT, port),
equalTo(
maybeStable(DB_OPERATION),
emitStableDatabaseSemconv() ? "write" : "WRITE"))));
emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"),
equalTo(
DB_OPERATION_BATCH_SIZE,
emitStableDatabaseSemconv() ? 2L : null))));
}

@Test
Expand All @@ -428,7 +441,9 @@ void testWriteWithUdp() {
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(
emitStableDatabaseSemconv() ? "write " + host + ":" + port : "WRITE")
emitStableDatabaseSemconv()
? "BATCH write " + host + ":" + port
: "WRITE")
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(maybeStable(DB_SYSTEM), INFLUXDB),
Expand All @@ -437,6 +452,9 @@ void testWriteWithUdp() {
equalTo(SERVER_PORT, port),
equalTo(
maybeStable(DB_OPERATION),
emitStableDatabaseSemconv() ? "write" : "WRITE"))));
emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"),
equalTo(
DB_OPERATION_BATCH_SIZE,
emitStableDatabaseSemconv() ? 2000L : null))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.instrumentation.api.internal.SemconvStability.emitStableDatabaseSemconv;
import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_BATCH_SIZE;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_NAME;
Expand Down Expand Up @@ -114,7 +115,9 @@ void testQueryAndModifyWithOneArgument() {
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(
emitStableDatabaseSemconv() ? "write " + dbName : "WRITE " + dbName)
emitStableDatabaseSemconv()
? "BATCH write " + dbName
: "WRITE " + dbName)
.hasKind(SpanKind.CLIENT)
.hasAttributesSatisfyingExactly(
equalTo(maybeStable(DB_SYSTEM), INFLUXDB),
Expand All @@ -123,7 +126,9 @@ void testQueryAndModifyWithOneArgument() {
equalTo(SERVER_PORT, port),
equalTo(
maybeStable(DB_OPERATION),
emitStableDatabaseSemconv() ? "write" : "WRITE"))),
emitStableDatabaseSemconv() ? "BATCH write" : "WRITE"),
equalTo(
DB_OPERATION_BATCH_SIZE, emitStableDatabaseSemconv() ? 2L : null))),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
Expand Down
Loading