Skip to content

Commit 0e4a2ca

Browse files
authored
Support sending OpenTelemetry metrics over HTTP/protobuf (#11055)
Wrap OtlpPayload as an OkHttp RequestBody Support sending OTLP data over HTTP Periodically collect OpenTelemetry metrics and export them over OTLP. Fix temporality reported over OTLP Avoid duplicate slash when extrapolating OTLP endpoint Avoid repeated toLowerCase in OtelInstrumentDescriptor.hashCode Add retry policy to OTLP exports Cleanup OTLP code Support sending OTLP data over GRPC Add random jitter when starting OtlpMetricsService Avoids a fleet of apps starting at the same time from exporting OTLP metrics in sync Avoid appending OTLP signal path to UDS file locations Co-authored-by: stuart.mcculloch <stuart.mcculloch@datadoghq.com>
1 parent a1f4ec5 commit 0e4a2ca

File tree

22 files changed

+603
-94
lines changed

22 files changed

+603
-94
lines changed

communication/src/main/java/datadog/communication/http/OkHttpUtils.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.net.InetSocketAddress;
1616
import java.net.Proxy;
1717
import java.nio.ByteBuffer;
18+
import java.util.Arrays;
1819
import java.util.Collections;
1920
import java.util.List;
2021
import java.util.Map;
@@ -27,6 +28,7 @@
2728
import okhttp3.HttpUrl;
2829
import okhttp3.MediaType;
2930
import okhttp3.OkHttpClient;
31+
import okhttp3.Protocol;
3032
import okhttp3.Request;
3133
import okhttp3.RequestBody;
3234
import okhttp3.Response;
@@ -62,7 +64,7 @@ public static OkHttpClient buildHttpClient(final HttpUrl url, final long timeout
6264
}
6365

6466
public static OkHttpClient buildHttpClient(
65-
final boolean isHttp,
67+
final boolean isPlainHttp,
6668
final String unixDomainSocketPath,
6769
final String namedPipe,
6870
final long timeoutMillis) {
@@ -71,7 +73,30 @@ public static OkHttpClient buildHttpClient(
7173
Config.get().isJdkSocketEnabled(),
7274
namedPipe,
7375
null,
74-
isHttp,
76+
isPlainHttp,
77+
false,
78+
null,
79+
null,
80+
null,
81+
null,
82+
null,
83+
null,
84+
timeoutMillis,
85+
Config.get().isAgentConfiguredUsingDefault());
86+
}
87+
88+
public static OkHttpClient buildHttp2Client(
89+
final boolean isPlainHttp,
90+
final String unixDomainSocketPath,
91+
final String namedPipe,
92+
final long timeoutMillis) {
93+
return buildHttpClient(
94+
unixDomainSocketPath,
95+
Config.get().isJdkSocketEnabled(),
96+
namedPipe,
97+
null,
98+
isPlainHttp,
99+
true,
75100
null,
76101
null,
77102
null,
@@ -99,6 +124,7 @@ public static OkHttpClient buildHttpClient(
99124
config.getAgentNamedPipe(),
100125
dispatcher,
101126
isPlainHttp(url),
127+
false,
102128
retryOnConnectionFailure,
103129
maxRunningRequests,
104130
proxyHost,
@@ -116,7 +142,8 @@ private static OkHttpClient buildHttpClient(
116142
final boolean useJdkUnixDomainSocket,
117143
final String namedPipe,
118144
final Dispatcher dispatcher,
119-
final boolean isHttp,
145+
final boolean isPlainHttp,
146+
final boolean isHttp2,
120147
final Boolean retryOnConnectionFailure,
121148
final Integer maxRunningRequests,
122149
final String proxyHost,
@@ -159,11 +186,19 @@ private static OkHttpClient buildHttpClient(
159186
log.debug("Using NamedPipe as http transport");
160187
}
161188

162-
if (isHttp) {
189+
if (isPlainHttp) {
163190
// force clear text when using http to avoid failures for JVMs without TLS
164191
builder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT));
165192
}
166193

194+
if (isHttp2) {
195+
if (isPlainHttp) {
196+
builder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE));
197+
} else {
198+
builder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1));
199+
}
200+
}
201+
167202
if (retryOnConnectionFailure != null) {
168203
builder.retryOnConnectionFailure(retryOnConnectionFailure);
169204
}

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/OtelInstrumentBuilder.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package datadog.trace.bootstrap.otel.metrics;
22

3+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_COUNTER;
4+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_GAUGE;
5+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER;
6+
37
import javax.annotation.Nullable;
48

59
public final class OtelInstrumentBuilder {
@@ -91,11 +95,11 @@ public OtelInstrumentDescriptor observableDescriptor() {
9195
private OtelInstrumentType observableType(OtelInstrumentType instrumentType) {
9296
switch (instrumentType) {
9397
case COUNTER:
94-
return OtelInstrumentType.OBSERVABLE_COUNTER;
98+
return OBSERVABLE_COUNTER;
9599
case UP_DOWN_COUNTER:
96-
return OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER;
100+
return OBSERVABLE_UP_DOWN_COUNTER;
97101
case GAUGE:
98-
return OtelInstrumentType.OBSERVABLE_GAUGE;
102+
return OBSERVABLE_GAUGE;
99103
default:
100104
throw new IllegalArgumentException(instrumentType + " has no observable equivalent");
101105
}

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/OtelInstrumentDescriptor.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public final class OtelInstrumentDescriptor {
1212
private final boolean longValues;
1313
@Nullable private final UTF8BytesString description;
1414
@Nullable private final UTF8BytesString unit;
15+
private int hash;
1516

1617
public OtelInstrumentDescriptor(
1718
String instrumentName,
@@ -64,11 +65,15 @@ public boolean equals(Object o) {
6465

6566
@Override
6667
public int hashCode() {
67-
int result = instrumentName.toString().toLowerCase(Locale.ROOT).hashCode();
68-
result = 31 * result + instrumentType.hashCode();
69-
result = 31 * result + Boolean.hashCode(longValues);
70-
result = 31 * result + Objects.hashCode(description);
71-
result = 31 * result + Objects.hashCode(unit);
68+
int result = hash;
69+
if (result == 0) {
70+
result = instrumentName.toString().toLowerCase(Locale.ROOT).hashCode();
71+
result = 31 * result + instrumentType.hashCode();
72+
result = 31 * result + Boolean.hashCode(longValues);
73+
result = 31 * result + Objects.hashCode(description);
74+
result = 31 * result + Objects.hashCode(unit);
75+
hash = result;
76+
}
7277
return result;
7378
}
7479

dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/metrics/data/OtelMetricStorage.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package datadog.trace.bootstrap.otel.metrics.data;
22

3+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.COUNTER;
4+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.HISTOGRAM;
5+
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_COUNTER;
6+
37
import datadog.logging.RatelimitedLogger;
48
import datadog.trace.api.Config;
59
import datadog.trace.api.config.OtlpConfig;
@@ -63,12 +67,10 @@ private static boolean shouldResetOnCollect(OtelInstrumentType type) {
6367
switch (TEMPORALITY_PREFERENCE) {
6468
case DELTA:
6569
// gauges and up/down counters stay as cumulative
66-
return type == OtelInstrumentType.HISTOGRAM
67-
|| type == OtelInstrumentType.COUNTER
68-
|| type == OtelInstrumentType.OBSERVABLE_COUNTER;
70+
return type == HISTOGRAM || type == COUNTER || type == OBSERVABLE_COUNTER;
6971
case LOWMEMORY:
7072
// observable counters, gauges, and up/down counters stay as cumulative
71-
return type == OtelInstrumentType.HISTOGRAM || type == OtelInstrumentType.COUNTER;
73+
return type == HISTOGRAM || type == COUNTER;
7274
case CUMULATIVE:
7375
default:
7476
return false;

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import datadog.trace.core.datastreams.DefaultDataStreamsMonitoring;
9393
import datadog.trace.core.monitor.HealthMetrics;
9494
import datadog.trace.core.monitor.TracerHealthMetrics;
95+
import datadog.trace.core.otlp.metrics.OtlpMetricsService;
9596
import datadog.trace.core.propagation.ExtractedContext;
9697
import datadog.trace.core.propagation.HttpCodec;
9798
import datadog.trace.core.propagation.InferredProxyPropagator;
@@ -786,19 +787,7 @@ private CoreTracer(
786787
// asynchronously create the aggregator to avoid triggering expensive classloading during the
787788
// tracer initialisation.
788789
sharedCommunicationObjects.whenReady(
789-
() ->
790-
AgentTaskScheduler.get()
791-
.execute(
792-
() -> {
793-
metricsAggregator = createMetricsAggregator(config, sco, this.healthMetrics);
794-
// Schedule the metrics aggregator to begin reporting after a random delay of
795-
// 1 to 10 seconds (using milliseconds granularity.)
796-
// This avoids a fleet of traced applications starting at the same time from
797-
// sending metrics in sync.
798-
AgentTaskScheduler.get()
799-
.scheduleWithJitter(
800-
MetricsAggregator::start, metricsAggregator, 1, SECONDS);
801-
}));
790+
() -> AgentTaskScheduler.get().execute(() -> startMetricsAggregation(config, sco)));
802791

803792
if (dataStreamsMonitoring == null) {
804793
this.dataStreamsMonitoring =
@@ -904,6 +893,20 @@ private CoreTracer(
904893
}
905894
}
906895

896+
private void startMetricsAggregation(Config config, SharedCommunicationObjects sco) {
897+
metricsAggregator = createMetricsAggregator(config, sco, this.healthMetrics);
898+
// Schedule the metrics aggregator to begin reporting after a random delay of
899+
// 1 to 10 seconds (using milliseconds granularity.)
900+
// This avoids a fleet of traced applications starting at the same time from
901+
// sending metrics in sync.
902+
AgentTaskScheduler.get()
903+
.scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS);
904+
905+
if (config.isMetricsOtlpExporterEnabled()) {
906+
OtlpMetricsService.INSTANCE.start();
907+
}
908+
}
909+
907910
/** Used by AgentTestRunner to inject configuration into the test tracer. */
908911
public void rebuildTraceConfig(Config config) {
909912
dynamicConfig
@@ -1415,6 +1418,9 @@ public void close() {
14151418
RumInjector.shutdownTelemetry();
14161419
AgentMeter.statsDClient().close();
14171420
metricsAggregator.close();
1421+
if (initialConfig.isMetricsOtlpExporterEnabled()) {
1422+
OtlpMetricsService.INSTANCE.shutdown();
1423+
}
14181424
dataStreamsMonitoring.close();
14191425
externalAgentLauncher.close();
14201426
healthMetrics.close();
@@ -1450,6 +1456,10 @@ public void flushMetrics() {
14501456
} catch (InterruptedException | ExecutionException | TimeoutException e) {
14511457
log.debug("Failed to wait for metrics flush.", e);
14521458
}
1459+
1460+
if (initialConfig.isMetricsOtlpExporterEnabled()) {
1461+
OtlpMetricsService.INSTANCE.flush();
1462+
}
14531463
}
14541464

14551465
@Override

dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpCommonProto.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
package datadog.trace.core.otlp.common;
22

3+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.BOOLEAN;
4+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.BOOLEAN_ARRAY;
5+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.DOUBLE;
6+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.DOUBLE_ARRAY;
7+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG;
8+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG_ARRAY;
9+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING;
10+
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ARRAY;
311
import static java.nio.charset.StandardCharsets.UTF_8;
412

513
import datadog.communication.serialization.GenerationalUtf8Cache;
@@ -8,7 +16,6 @@
816
import datadog.communication.serialization.StreamingBuffer;
917
import datadog.trace.api.Config;
1018
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
11-
import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;
1219
import java.nio.ByteBuffer;
1320
import java.util.List;
1421

@@ -149,28 +156,28 @@ public static void writeInstrumentationScope(
149156
public static void writeAttribute(StreamingBuffer buf, int type, String key, Object value) {
150157
byte[] keyUtf8 = keyUtf8(key);
151158
switch (type) {
152-
case OtlpAttributeVisitor.STRING:
159+
case STRING:
153160
writeStringAttribute(buf, keyUtf8, valueUtf8((String) value));
154161
break;
155-
case OtlpAttributeVisitor.BOOLEAN:
162+
case BOOLEAN:
156163
writeBooleanAttribute(buf, keyUtf8, (boolean) value);
157164
break;
158-
case OtlpAttributeVisitor.LONG:
165+
case LONG:
159166
writeLongAttribute(buf, keyUtf8, (long) value);
160167
break;
161-
case OtlpAttributeVisitor.DOUBLE:
168+
case DOUBLE:
162169
writeDoubleAttribute(buf, keyUtf8, (double) value);
163170
break;
164-
case OtlpAttributeVisitor.STRING_ARRAY:
171+
case STRING_ARRAY:
165172
writeStringArrayAttribute(buf, keyUtf8, (List<String>) value);
166173
break;
167-
case OtlpAttributeVisitor.BOOLEAN_ARRAY:
174+
case BOOLEAN_ARRAY:
168175
writeBooleanArrayAttribute(buf, keyUtf8, (List<Boolean>) value);
169176
break;
170-
case OtlpAttributeVisitor.LONG_ARRAY:
177+
case LONG_ARRAY:
171178
writeLongArrayAttribute(buf, keyUtf8, (List<Long>) value);
172179
break;
173-
case OtlpAttributeVisitor.DOUBLE_ARRAY:
180+
case DOUBLE_ARRAY:
174181
writeDoubleArrayAttribute(buf, keyUtf8, (List<Double>) value);
175182
break;
176183
default:
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package datadog.trace.core.otlp.common;
2+
3+
import java.io.IOException;
4+
import javax.annotation.Nonnull;
5+
import okhttp3.MediaType;
6+
import okhttp3.RequestBody;
7+
import okio.Buffer;
8+
import okio.BufferedSink;
9+
import okio.GzipSink;
10+
import okio.Okio;
11+
12+
/** Wraps an {@link OtlpPayload} as a GRPC {@link RequestBody}. */
13+
public final class OtlpGrpcRequestBody extends RequestBody {
14+
15+
private static final MediaType GRPC_MEDIA_TYPE = MediaType.parse("application/grpc");
16+
17+
private static final int HEADER_LENGTH = 5;
18+
19+
private static final byte UNCOMPRESSED_FLAG = 0;
20+
private static final byte COMPRESSED_FLAG = 1;
21+
22+
private final OtlpPayload payload;
23+
private final boolean gzip;
24+
25+
public OtlpGrpcRequestBody(OtlpPayload payload, boolean gzip) {
26+
this.payload = payload;
27+
this.gzip = gzip;
28+
}
29+
30+
@Override
31+
public long contentLength() {
32+
return gzip ? -1 : HEADER_LENGTH + payload.getContentLength();
33+
}
34+
35+
@Override
36+
public MediaType contentType() {
37+
return GRPC_MEDIA_TYPE;
38+
}
39+
40+
@Override
41+
public void writeTo(@Nonnull BufferedSink sink) throws IOException {
42+
if (gzip) {
43+
try (Buffer gzipBody = new Buffer()) {
44+
try (BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipBody))) {
45+
payload.drain(gzipSink::write);
46+
}
47+
sink.writeByte(COMPRESSED_FLAG);
48+
long gzipLength = gzipBody.size();
49+
sink.writeInt((int) gzipLength);
50+
sink.write(gzipBody, gzipLength);
51+
}
52+
} else {
53+
sink.writeByte(UNCOMPRESSED_FLAG);
54+
sink.writeInt(payload.getContentLength());
55+
payload.drain(sink::write);
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)