Skip to content

Commit 274c553

Browse files
committed
Support sending OTLP data over GRPC
1 parent 7d10c73 commit 274c553

File tree

4 files changed

+198
-4
lines changed

4 files changed

+198
-4
lines changed

communication/src/main/java/datadog/communication/http/OkHttpUtils.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.net.InetSocketAddress;
1616
import java.net.Proxy;
1717
import java.nio.ByteBuffer;
18+
import java.util.Arrays;
1819
import java.util.Collections;
1920
import java.util.List;
2021
import java.util.Map;
@@ -27,6 +28,7 @@
2728
import okhttp3.HttpUrl;
2829
import okhttp3.MediaType;
2930
import okhttp3.OkHttpClient;
31+
import okhttp3.Protocol;
3032
import okhttp3.Request;
3133
import okhttp3.RequestBody;
3234
import okhttp3.Response;
@@ -62,7 +64,7 @@ public static OkHttpClient buildHttpClient(final HttpUrl url, final long timeout
6264
}
6365

6466
public static OkHttpClient buildHttpClient(
65-
final boolean isHttp,
67+
final boolean isPlainHttp,
6668
final String unixDomainSocketPath,
6769
final String namedPipe,
6870
final long timeoutMillis) {
@@ -71,7 +73,30 @@ public static OkHttpClient buildHttpClient(
7173
Config.get().isJdkSocketEnabled(),
7274
namedPipe,
7375
null,
74-
isHttp,
76+
isPlainHttp,
77+
false,
78+
null,
79+
null,
80+
null,
81+
null,
82+
null,
83+
null,
84+
timeoutMillis,
85+
Config.get().isAgentConfiguredUsingDefault());
86+
}
87+
88+
public static OkHttpClient buildHttp2Client(
89+
final boolean isPlainHttp,
90+
final String unixDomainSocketPath,
91+
final String namedPipe,
92+
final long timeoutMillis) {
93+
return buildHttpClient(
94+
unixDomainSocketPath,
95+
Config.get().isJdkSocketEnabled(),
96+
namedPipe,
97+
null,
98+
isPlainHttp,
99+
true,
75100
null,
76101
null,
77102
null,
@@ -99,6 +124,7 @@ public static OkHttpClient buildHttpClient(
99124
config.getAgentNamedPipe(),
100125
dispatcher,
101126
isPlainHttp(url),
127+
false,
102128
retryOnConnectionFailure,
103129
maxRunningRequests,
104130
proxyHost,
@@ -116,7 +142,8 @@ private static OkHttpClient buildHttpClient(
116142
final boolean useJdkUnixDomainSocket,
117143
final String namedPipe,
118144
final Dispatcher dispatcher,
119-
final boolean isHttp,
145+
final boolean isPlainHttp,
146+
final boolean isHttp2,
120147
final Boolean retryOnConnectionFailure,
121148
final Integer maxRunningRequests,
122149
final String proxyHost,
@@ -159,11 +186,19 @@ private static OkHttpClient buildHttpClient(
159186
log.debug("Using NamedPipe as http transport");
160187
}
161188

162-
if (isHttp) {
189+
if (isPlainHttp) {
163190
// force clear text when using http to avoid failures for JVMs without TLS
164191
builder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT));
165192
}
166193

194+
if (isHttp2) {
195+
if (isPlainHttp) {
196+
builder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE));
197+
} else {
198+
builder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1));
199+
}
200+
}
201+
167202
if (retryOnConnectionFailure != null) {
168203
builder.retryOnConnectionFailure(retryOnConnectionFailure);
169204
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package datadog.trace.core.otlp.common;
2+
3+
import java.io.IOException;
4+
import javax.annotation.Nonnull;
5+
import okhttp3.MediaType;
6+
import okhttp3.RequestBody;
7+
import okio.Buffer;
8+
import okio.BufferedSink;
9+
import okio.GzipSink;
10+
import okio.Okio;
11+
12+
/** Wraps an {@link OtlpPayload} as a GRPC {@link RequestBody}. */
13+
public final class OtlpGrpcRequestBody extends RequestBody {
14+
15+
private static final MediaType GRPC_MEDIA_TYPE = MediaType.parse("application/grpc");
16+
17+
private static final int HEADER_LENGTH = 5;
18+
19+
private static final byte UNCOMPRESSED_FLAG = 0;
20+
private static final byte COMPRESSED_FLAG = 1;
21+
22+
private final OtlpPayload payload;
23+
private final boolean gzip;
24+
25+
public OtlpGrpcRequestBody(OtlpPayload payload, boolean gzip) {
26+
this.payload = payload;
27+
this.gzip = gzip;
28+
}
29+
30+
@Override
31+
public long contentLength() {
32+
return gzip ? -1 : HEADER_LENGTH + payload.getContentLength();
33+
}
34+
35+
@Override
36+
public MediaType contentType() {
37+
return GRPC_MEDIA_TYPE;
38+
}
39+
40+
@Override
41+
public void writeTo(@Nonnull BufferedSink sink) throws IOException {
42+
if (gzip) {
43+
try (Buffer gzipBody = new Buffer()) {
44+
try (BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipBody))) {
45+
payload.drain(gzipSink::write);
46+
}
47+
sink.writeByte(COMPRESSED_FLAG);
48+
long gzipLength = gzipBody.size();
49+
sink.writeInt((int) gzipLength);
50+
sink.write(gzipBody, gzipLength);
51+
}
52+
} else {
53+
sink.writeByte(UNCOMPRESSED_FLAG);
54+
sink.writeInt(payload.getContentLength());
55+
payload.drain(sink::write);
56+
}
57+
}
58+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package datadog.trace.core.otlp.common;
2+
3+
import static datadog.communication.http.OkHttpUtils.buildGrpcClient;
4+
import static datadog.communication.http.OkHttpUtils.isPlainHttp;
5+
import static datadog.communication.http.OkHttpUtils.sendWithRetries;
6+
7+
import datadog.communication.http.HttpRetryPolicy;
8+
import datadog.logging.RatelimitedLogger;
9+
import datadog.trace.api.config.OtlpConfig.Compression;
10+
import java.io.IOException;
11+
import java.util.Map;
12+
import java.util.concurrent.TimeUnit;
13+
import okhttp3.HttpUrl;
14+
import okhttp3.OkHttpClient;
15+
import okhttp3.Request;
16+
import okhttp3.Response;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
/** Sends chunks of OTLP data over GRPC. */
21+
public final class OtlpGrpcSender implements OtlpSender {
22+
private static final Logger LOGGER = LoggerFactory.getLogger(OtlpGrpcSender.class);
23+
private static final RatelimitedLogger RATELIMITED_LOGGER =
24+
new RatelimitedLogger(LOGGER, 5, TimeUnit.MINUTES);
25+
26+
private final HttpRetryPolicy.Factory retryPolicy =
27+
new HttpRetryPolicy.Factory(5, 100, 2.0, true);
28+
29+
private final HttpUrl url;
30+
private final Map<String, String> headers;
31+
private final boolean gzip;
32+
33+
private final OkHttpClient client;
34+
35+
public OtlpGrpcSender(
36+
String endpoint,
37+
String signalPath,
38+
Map<String, String> headers,
39+
int timeoutMillis,
40+
Compression compression) {
41+
42+
String unixDomainSocketPath;
43+
if (endpoint.startsWith("unix://")) {
44+
unixDomainSocketPath = endpoint.substring(7);
45+
this.url = HttpUrl.get("http://localhost:4317" + signalPath);
46+
} else {
47+
unixDomainSocketPath = null;
48+
this.url = HttpUrl.get(endpoint + signalPath);
49+
}
50+
51+
this.headers = headers;
52+
this.gzip = compression == Compression.GZIP;
53+
54+
this.client = buildGrpcClient(isPlainHttp(url), unixDomainSocketPath, null, timeoutMillis);
55+
}
56+
57+
@Override
58+
public void send(OtlpPayload payload) {
59+
Request request = makeRequest(payload);
60+
try (Response response = sendWithRetries(client, retryPolicy, request)) {
61+
if (!response.isSuccessful()) {
62+
RATELIMITED_LOGGER.warn(
63+
"OTLP export to {} failed with status {}: {}",
64+
request.url(),
65+
response.code(),
66+
response.message());
67+
}
68+
} catch (IOException e) {
69+
RATELIMITED_LOGGER.warn(
70+
"OTLP export to {} failed with exception: {}", request.url(), e.toString());
71+
}
72+
}
73+
74+
@Override
75+
public void shutdown() {
76+
client.connectionPool().evictAll();
77+
}
78+
79+
private Request makeRequest(OtlpPayload payload) {
80+
Request.Builder requestBuilder = new Request.Builder().url(url);
81+
if (gzip) {
82+
requestBuilder.header("grpc-encoding", "gzip");
83+
}
84+
85+
// add configured headers to the request
86+
headers.forEach(requestBuilder::addHeader);
87+
88+
return requestBuilder.post(new OtlpGrpcRequestBody(payload, gzip)).build();
89+
}
90+
}

dd-trace-core/src/main/java/datadog/trace/core/otlp/metrics/OtlpMetricsService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static datadog.trace.util.AgentThreadFactory.AgentThread.OTLP_METRICS_EXPORTER;
44

55
import datadog.trace.api.Config;
6+
import datadog.trace.core.otlp.common.OtlpGrpcSender;
67
import datadog.trace.core.otlp.common.OtlpHttpSender;
78
import datadog.trace.core.otlp.common.OtlpPayload;
89
import datadog.trace.core.otlp.common.OtlpSender;
@@ -27,6 +28,16 @@ private OtlpMetricsService(Config config) {
2728
this.scheduler = new AgentTaskScheduler(OTLP_METRICS_EXPORTER);
2829

2930
switch (config.getOtlpMetricsProtocol()) {
31+
case GRPC:
32+
this.collector = OtlpMetricsProtoCollector.INSTANCE;
33+
this.sender =
34+
new OtlpGrpcSender(
35+
config.getOtlpMetricsEndpoint(),
36+
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
37+
config.getOtlpMetricsHeaders(),
38+
config.getOtlpMetricsTimeout(),
39+
config.getOtlpMetricsCompression());
40+
break;
3041
case HTTP_PROTOBUF:
3142
this.collector = OtlpMetricsProtoCollector.INSTANCE;
3243
this.sender =

0 commit comments

Comments
 (0)