Skip to content

Commit 7d10c73

Browse files
committed
Cleanup OTLP code
1 parent e5a7dcc commit 7d10c73

File tree

10 files changed

+89
-50
lines changed

10 files changed

+89
-50
lines changed

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/OtelInstrumentBuilder.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package datadog.trace.bootstrap.otel.metrics;
22

3+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_COUNTER;
4+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_GAUGE;
5+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER;
6+
37
import javax.annotation.Nullable;
48

59
public final class OtelInstrumentBuilder {
@@ -91,11 +95,11 @@ public OtelInstrumentDescriptor observableDescriptor() {
9195
private OtelInstrumentType observableType(OtelInstrumentType instrumentType) {
9296
switch (instrumentType) {
9397
case COUNTER:
94-
return OtelInstrumentType.OBSERVABLE_COUNTER;
98+
return OBSERVABLE_COUNTER;
9599
case UP_DOWN_COUNTER:
96-
return OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER;
100+
return OBSERVABLE_UP_DOWN_COUNTER;
97101
case GAUGE:
98-
return OtelInstrumentType.OBSERVABLE_GAUGE;
102+
return OBSERVABLE_GAUGE;
99103
default:
100104
throw new IllegalArgumentException(instrumentType + " has no observable equivalent");
101105
}

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelMetricStorage.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package datadog.trace.bootstrap.otel.metrics.data;
22

3+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.COUNTER;
4+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.HISTOGRAM;
5+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_COUNTER;
6+
37
import datadog.logging.RatelimitedLogger;
48
import datadog.trace.api.Config;
59
import datadog.trace.api.config.OtlpConfig;
@@ -63,12 +67,10 @@ private static boolean shouldResetOnCollect(OtelInstrumentType type) {
6367
switch (TEMPORALITY_PREFERENCE) {
6468
case DELTA:
6569
// gauges and up/down counters stay as cumulative
66-
return type == OtelInstrumentType.HISTOGRAM
67-
|| type == OtelInstrumentType.COUNTER
68-
|| type == OtelInstrumentType.OBSERVABLE_COUNTER;
70+
return type == HISTOGRAM || type == COUNTER || type == OBSERVABLE_COUNTER;
6971
case LOWMEMORY:
7072
// observable counters, gauges, and up/down counters stay as cumulative
71-
return type == OtelInstrumentType.HISTOGRAM || type == OtelInstrumentType.COUNTER;
73+
return type == HISTOGRAM || type == COUNTER;
7274
case CUMULATIVE:
7375
default:
7476
return false;

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,6 +1418,9 @@ public void close() {
14181418
RumInjector.shutdownTelemetry();
14191419
AgentMeter.statsDClient().close();
14201420
metricsAggregator.close();
1421+
if (initialConfig.isMetricsOtlpExporterEnabled()) {
1422+
OtlpMetricsService.INSTANCE.shutdown();
1423+
}
14211424
dataStreamsMonitoring.close();
14221425
externalAgentLauncher.close();
14231426
healthMetrics.close();

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
package datadog.trace.core.otlp.common;
22

3+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.BOOLEAN;
4+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.BOOLEAN_ARRAY;
5+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.DOUBLE;
6+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.DOUBLE_ARRAY;
7+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG;
8+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG_ARRAY;
9+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING;
10+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ARRAY;
311
import static java.nio.charset.StandardCharsets.UTF_8;
412

513
import datadog.communication.serialization.GenerationalUtf8Cache;
@@ -8,7 +16,6 @@
816
import datadog.communication.serialization.StreamingBuffer;
917
import datadog.trace.api.Config;
1018
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
11-
import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;
1219
import java.nio.ByteBuffer;
1320
import java.util.List;
1421

@@ -149,28 +156,28 @@ public static void writeInstrumentationScope(
149156
public static void writeAttribute(StreamingBuffer buf, int type, String key, Object value) {
150157
byte[] keyUtf8 = keyUtf8(key);
151158
switch (type) {
152-
case OtlpAttributeVisitor.STRING:
159+
case STRING:
153160
writeStringAttribute(buf, keyUtf8, valueUtf8((String) value));
154161
break;
155-
case OtlpAttributeVisitor.BOOLEAN:
162+
case BOOLEAN:
156163
writeBooleanAttribute(buf, keyUtf8, (boolean) value);
157164
break;
158-
case OtlpAttributeVisitor.LONG:
165+
case LONG:
159166
writeLongAttribute(buf, keyUtf8, (long) value);
160167
break;
161-
case OtlpAttributeVisitor.DOUBLE:
168+
case DOUBLE:
162169
writeDoubleAttribute(buf, keyUtf8, (double) value);
163170
break;
164-
case OtlpAttributeVisitor.STRING_ARRAY:
171+
case STRING_ARRAY:
165172
writeStringArrayAttribute(buf, keyUtf8, (List<String>) value);
166173
break;
167-
case OtlpAttributeVisitor.BOOLEAN_ARRAY:
174+
case BOOLEAN_ARRAY:
168175
writeBooleanArrayAttribute(buf, keyUtf8, (List<Boolean>) value);
169176
break;
170-
case OtlpAttributeVisitor.LONG_ARRAY:
177+
case LONG_ARRAY:
171178
writeLongArrayAttribute(buf, keyUtf8, (List<Long>) value);
172179
break;
173-
case OtlpAttributeVisitor.DOUBLE_ARRAY:
180+
case DOUBLE_ARRAY:
174181
writeDoubleArrayAttribute(buf, keyUtf8, (List<Double>) value);
175182
break;
176183
default:

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpHttpRequestBody.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,25 @@
55
import okhttp3.MediaType;
66
import okhttp3.RequestBody;
77
import okio.BufferedSink;
8+
import okio.GzipSink;
9+
import okio.Okio;
810

911
/** Wraps an {@link OtlpPayload} as an OkHttp {@link RequestBody}. */
1012
public final class OtlpHttpRequestBody extends RequestBody {
1113

1214
private final OtlpPayload payload;
1315
private final MediaType mediaType;
16+
private final boolean gzip;
1417

15-
public OtlpHttpRequestBody(OtlpPayload payload) {
18+
public OtlpHttpRequestBody(OtlpPayload payload, boolean gzip) {
1619
this.payload = payload;
1720
this.mediaType = MediaType.get(payload.getContentType());
21+
this.gzip = gzip;
1822
}
1923

2024
@Override
2125
public long contentLength() {
22-
return payload.getContentLength();
26+
return gzip ? -1 : payload.getContentLength();
2327
}
2428

2529
@Override
@@ -29,6 +33,12 @@ public MediaType contentType() {
2933

3034
@Override
3135
public void writeTo(@Nonnull BufferedSink sink) throws IOException {
32-
payload.drain(sink::write);
36+
if (gzip) {
37+
try (BufferedSink gzipSink = Okio.buffer(new GzipSink(sink))) {
38+
payload.drain(gzipSink::write);
39+
}
40+
} else {
41+
payload.drain(sink::write);
42+
}
3343
}
3444
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpHttpSender.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package datadog.trace.core.otlp.common;
22

33
import static datadog.communication.http.OkHttpUtils.buildHttpClient;
4-
import static datadog.communication.http.OkHttpUtils.gzippedRequestBodyOf;
54
import static datadog.communication.http.OkHttpUtils.isPlainHttp;
65
import static datadog.communication.http.OkHttpUtils.sendWithRetries;
76

@@ -14,7 +13,6 @@
1413
import okhttp3.HttpUrl;
1514
import okhttp3.OkHttpClient;
1615
import okhttp3.Request;
17-
import okhttp3.RequestBody;
1816
import okhttp3.Response;
1917
import org.slf4j.Logger;
2018
import org.slf4j.LoggerFactory;
@@ -56,10 +54,8 @@ public OtlpHttpSender(
5654
this.client = buildHttpClient(isPlainHttp(url), unixDomainSocketPath, null, timeoutMillis);
5755
}
5856

57+
@Override
5958
public void send(OtlpPayload payload) {
60-
if (payload == OtlpPayload.EMPTY) {
61-
return; // nothing to send
62-
}
6359
Request request = makeRequest(payload);
6460
try (Response response = sendWithRetries(client, retryPolicy, request)) {
6561
if (!response.isSuccessful()) {
@@ -75,30 +71,20 @@ public void send(OtlpPayload payload) {
7571
}
7672
}
7773

74+
@Override
7875
public void shutdown() {
7976
client.connectionPool().evictAll();
8077
}
8178

8279
private Request makeRequest(OtlpPayload payload) {
83-
Request.Builder requestBuilder =
84-
new Request.Builder().url(url).header("Content-Type", payload.getContentType());
85-
80+
Request.Builder requestBuilder = new Request.Builder().url(url);
8681
if (gzip) {
87-
requestBuilder
88-
.header("Content-Length", "-1")
89-
.header("Content-Encoding", "gzip")
90-
.header("Transfer-Encoding", "chunked");
91-
} else {
92-
requestBuilder.header("Content-Length", String.valueOf(payload.getContentLength()));
82+
requestBuilder.header("Content-Encoding", "gzip").header("Transfer-Encoding", "chunked");
9383
}
9484

85+
// add configured headers to the request
9586
headers.forEach(requestBuilder::addHeader);
9687

97-
RequestBody requestBody = new OtlpHttpRequestBody(payload);
98-
if (gzip) {
99-
requestBody = gzippedRequestBodyOf(requestBody);
100-
}
101-
102-
return requestBuilder.post(requestBody).build();
88+
return requestBuilder.post(new OtlpHttpRequestBody(payload, gzip)).build();
10389
}
10490
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
public final class OtlpResourceProto {
1919
private OtlpResourceProto() {}
2020

21-
public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get());
22-
2321
private static final Set<String> IGNORED_GLOBAL_TAGS =
2422
new HashSet<>(
2523
Arrays.asList(
@@ -30,6 +28,8 @@ private OtlpResourceProto() {}
3028
"deployment.environment.name",
3129
"service.version"));
3230

31+
public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get());
32+
3333
static byte[] buildResourceMessage(Config config) {
3434
GrowableBuffer buf = new GrowableBuffer(512);
3535

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpSender.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@
33
/** Sends chunks of OTLP data. */
44
public interface OtlpSender {
55
void send(OtlpPayload payload);
6+
7+
void shutdown();
68
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsCollector.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,7 @@
44

55
/** Collects metrics ready for export. */
66
public interface OtlpMetricsCollector {
7+
OtlpMetricsCollector NOOP_COLLECTOR = () -> OtlpPayload.EMPTY;
8+
79
OtlpPayload collectMetrics();
810
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsService.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44

55
import datadog.trace.api.Config;
66
import datadog.trace.core.otlp.common.OtlpHttpSender;
7+
import datadog.trace.core.otlp.common.OtlpPayload;
78
import datadog.trace.core.otlp.common.OtlpSender;
89
import datadog.trace.util.AgentTaskScheduler;
910
import java.util.concurrent.TimeUnit;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1013

1114
/** Periodic service to collect OpenTelemetry metrics and export them over OTLP. */
1215
public final class OtlpMetricsService {
16+
private static final Logger LOGGER = LoggerFactory.getLogger(OtlpMetricsService.class);
17+
1318
public static final OtlpMetricsService INSTANCE = new OtlpMetricsService(Config.get());
1419

1520
private final AgentTaskScheduler scheduler;
@@ -20,14 +25,23 @@ public final class OtlpMetricsService {
2025

2126
private OtlpMetricsService(Config config) {
2227
this.scheduler = new AgentTaskScheduler(OTLP_METRICS_EXPORTER);
23-
this.collector = OtlpMetricsProtoCollector.INSTANCE;
24-
this.sender =
25-
new OtlpHttpSender(
26-
config.getOtlpMetricsEndpoint(),
27-
"/v1/metrics",
28-
config.getOtlpMetricsHeaders(),
29-
config.getOtlpMetricsTimeout(),
30-
config.getOtlpMetricsCompression());
28+
29+
switch (config.getOtlpMetricsProtocol()) {
30+
case HTTP_PROTOBUF:
31+
this.collector = OtlpMetricsProtoCollector.INSTANCE;
32+
this.sender =
33+
new OtlpHttpSender(
34+
config.getOtlpMetricsEndpoint(),
35+
"/v1/metrics",
36+
config.getOtlpMetricsHeaders(),
37+
config.getOtlpMetricsTimeout(),
38+
config.getOtlpMetricsCompression());
39+
break;
40+
default:
41+
LOGGER.debug("Unsupported OTLP metrics protocol: {}", config.getOtlpMetricsProtocol());
42+
this.collector = OtlpMetricsCollector.NOOP_COLLECTOR;
43+
this.sender = null;
44+
}
3145

3246
this.intervalMillis = config.getMetricsOtelInterval();
3347
}
@@ -41,7 +55,16 @@ public void flush() {
4155
scheduler.execute(this::export);
4256
}
4357

58+
public void shutdown() {
59+
if (sender != null) {
60+
sender.shutdown();
61+
}
62+
}
63+
4464
private void export() {
45-
sender.send(collector.collectMetrics());
65+
OtlpPayload payload = collector.collectMetrics();
66+
if (payload != OtlpPayload.EMPTY) {
67+
sender.send(payload);
68+
}
4669
}
4770
}

0 commit comments

Comments
 (0)