Skip to content

Commit 1d8b3de

Browse files
committed
Implement OtlpWriter
1 parent 69df037 commit 1d8b3de

2 files changed

Lines changed: 155 additions & 4 deletions

File tree

dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,12 @@ public final class ConfigDefaults {
108108
static final int DEFAULT_METRICS_OTEL_TIMEOUT = 7_500; // ms
109109
static final int DEFAULT_METRICS_OTEL_CARDINALITY_LIMIT = 2_000;
110110

111-
static final int DEFAULT_OTLP_TRACES_TIMEOUT = 10_000; // ms
111+
public static final int DEFAULT_OTLP_TRACES_TIMEOUT = 10_000; // ms
112112

113113
static final String DEFAULT_OTLP_HTTP_METRICS_ENDPOINT = "v1/metrics";
114-
static final String DEFAULT_OTLP_HTTP_TRACES_ENDPOINT = "v1/traces";
115-
static final String DEFAULT_OTLP_HTTP_PORT = "4318";
116-
static final String DEFAULT_OTLP_GRPC_PORT = "4317";
114+
public static final String DEFAULT_OTLP_HTTP_TRACES_ENDPOINT = "v1/traces";
115+
public static final String DEFAULT_OTLP_HTTP_PORT = "4318";
116+
public static final String DEFAULT_OTLP_GRPC_PORT = "4317";
117117

118118
static final int DEFAULT_DOGSTATSD_START_DELAY = 15; // seconds
119119

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package datadog.trace.common.writer;
2+
3+
import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_PORT;
4+
import static datadog.trace.api.ConfigDefaults.DEFAULT_OTLP_HTTP_TRACES_ENDPOINT;
5+
6+
import datadog.communication.ddagent.DroppingPolicy;
7+
import datadog.trace.api.config.OtlpConfig;
8+
import datadog.trace.common.sampling.SingleSpanSampler;
9+
import datadog.trace.common.writer.ddagent.Prioritization;
10+
import datadog.trace.core.monitor.HealthMetrics;
11+
import datadog.trace.core.otlp.common.OtlpGrpcSender;
12+
import datadog.trace.core.otlp.common.OtlpHttpSender;
13+
import datadog.trace.core.otlp.common.OtlpSender;
14+
import java.util.Collections;
15+
import java.util.Map;
16+
import java.util.concurrent.TimeUnit;
17+
18+
public class OtlpWriter extends RemoteWriter {
19+
20+
private static final int BUFFER_SIZE = 1024;
21+
private static final String TRACES_SIGNAL_PATH = "/" + DEFAULT_OTLP_HTTP_TRACES_ENDPOINT;
22+
private static final String DEFAULT_OTLP_HTTP_ENDPOINT =
23+
"http://localhost:" + DEFAULT_OTLP_HTTP_PORT + TRACES_SIGNAL_PATH;
24+
25+
public static OtlpWriterBuilder builder() {
26+
return new OtlpWriterBuilder();
27+
}
28+
29+
private final OtlpSender sender;
30+
31+
OtlpWriter(
32+
TraceProcessingWorker worker,
33+
PayloadDispatcher dispatcher,
34+
OtlpSender sender,
35+
HealthMetrics healthMetrics,
36+
int flushTimeout,
37+
TimeUnit flushTimeoutUnit,
38+
boolean alwaysFlush) {
39+
super(worker, dispatcher, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
40+
this.sender = sender;
41+
}
42+
43+
@Override
44+
public void close() {
45+
super.close();
46+
sender.shutdown();
47+
}
48+
49+
public static class OtlpWriterBuilder {
50+
private String endpoint = DEFAULT_OTLP_HTTP_ENDPOINT;
51+
private Map<String, String> headers = Collections.emptyMap();
52+
private int timeoutMillis = (int) TimeUnit.SECONDS.toMillis(10);
53+
private OtlpConfig.Protocol protocol = OtlpConfig.Protocol.HTTP_PROTOBUF;
54+
private OtlpConfig.Compression compression = OtlpConfig.Compression.NONE;
55+
private int traceBufferSize = BUFFER_SIZE;
56+
private HealthMetrics healthMetrics = HealthMetrics.NO_OP;
57+
private int flushIntervalMilliseconds = 1000;
58+
private int flushTimeout = 1;
59+
private TimeUnit flushTimeoutUnit = TimeUnit.SECONDS;
60+
private boolean alwaysFlush = false;
61+
private SingleSpanSampler singleSpanSampler;
62+
private OtlpSender sender;
63+
64+
public OtlpWriterBuilder endpoint(String endpoint) {
65+
this.endpoint = endpoint;
66+
return this;
67+
}
68+
69+
public OtlpWriterBuilder headers(Map<String, String> headers) {
70+
this.headers = headers;
71+
return this;
72+
}
73+
74+
public OtlpWriterBuilder timeoutMillis(int timeoutMillis) {
75+
this.timeoutMillis = timeoutMillis;
76+
return this;
77+
}
78+
79+
public OtlpWriterBuilder protocol(OtlpConfig.Protocol protocol) {
80+
this.protocol = protocol;
81+
return this;
82+
}
83+
84+
public OtlpWriterBuilder compression(OtlpConfig.Compression compression) {
85+
this.compression = compression;
86+
return this;
87+
}
88+
89+
public OtlpWriterBuilder traceBufferSize(int traceBufferSize) {
90+
this.traceBufferSize = traceBufferSize;
91+
return this;
92+
}
93+
94+
public OtlpWriterBuilder healthMetrics(HealthMetrics healthMetrics) {
95+
this.healthMetrics = healthMetrics;
96+
return this;
97+
}
98+
99+
public OtlpWriterBuilder flushIntervalMilliseconds(int flushIntervalMilliseconds) {
100+
this.flushIntervalMilliseconds = flushIntervalMilliseconds;
101+
return this;
102+
}
103+
104+
public OtlpWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) {
105+
this.flushTimeout = flushTimeout;
106+
this.flushTimeoutUnit = flushTimeoutUnit;
107+
return this;
108+
}
109+
110+
public OtlpWriterBuilder alwaysFlush(boolean alwaysFlush) {
111+
this.alwaysFlush = alwaysFlush;
112+
return this;
113+
}
114+
115+
public OtlpWriterBuilder spanSamplingRules(SingleSpanSampler singleSpanSampler) {
116+
this.singleSpanSampler = singleSpanSampler;
117+
return this;
118+
}
119+
120+
OtlpWriterBuilder sender(OtlpSender sender) {
121+
this.sender = sender;
122+
return this;
123+
}
124+
125+
public OtlpWriter build() {
126+
if (sender == null) {
127+
sender =
128+
protocol == OtlpConfig.Protocol.GRPC
129+
? new OtlpGrpcSender(
130+
endpoint, TRACES_SIGNAL_PATH, headers, timeoutMillis, compression)
131+
: new OtlpHttpSender(
132+
endpoint, TRACES_SIGNAL_PATH, headers, timeoutMillis, compression);
133+
}
134+
135+
final OtlpPayloadDispatcher dispatcher = new OtlpPayloadDispatcher(sender);
136+
final TraceProcessingWorker worker =
137+
new TraceProcessingWorker(
138+
traceBufferSize,
139+
healthMetrics,
140+
dispatcher,
141+
DroppingPolicy.DISABLED,
142+
Prioritization.ENSURE_TRACE,
143+
flushIntervalMilliseconds,
144+
TimeUnit.MILLISECONDS,
145+
singleSpanSampler);
146+
147+
return new OtlpWriter(
148+
worker, dispatcher, sender, healthMetrics, flushTimeout, flushTimeoutUnit, alwaysFlush);
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)