Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -27,6 +28,7 @@
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
Expand Down Expand Up @@ -62,7 +64,7 @@ public static OkHttpClient buildHttpClient(final HttpUrl url, final long timeout
}

public static OkHttpClient buildHttpClient(
final boolean isHttp,
final boolean isPlainHttp,
final String unixDomainSocketPath,
final String namedPipe,
final long timeoutMillis) {
Expand All @@ -71,7 +73,30 @@ public static OkHttpClient buildHttpClient(
Config.get().isJdkSocketEnabled(),
namedPipe,
null,
isHttp,
isPlainHttp,
false,
null,
null,
null,
null,
null,
null,
timeoutMillis,
Config.get().isAgentConfiguredUsingDefault());
}

public static OkHttpClient buildHttp2Client(
final boolean isPlainHttp,
final String unixDomainSocketPath,
final String namedPipe,
final long timeoutMillis) {
return buildHttpClient(
unixDomainSocketPath,
Config.get().isJdkSocketEnabled(),
namedPipe,
null,
isPlainHttp,
true,
null,
null,
null,
Expand Down Expand Up @@ -99,6 +124,7 @@ public static OkHttpClient buildHttpClient(
config.getAgentNamedPipe(),
dispatcher,
isPlainHttp(url),
false,
retryOnConnectionFailure,
maxRunningRequests,
proxyHost,
Expand All @@ -116,7 +142,8 @@ private static OkHttpClient buildHttpClient(
final boolean useJdkUnixDomainSocket,
final String namedPipe,
final Dispatcher dispatcher,
final boolean isHttp,
final boolean isPlainHttp,
final boolean isHttp2,
final Boolean retryOnConnectionFailure,
final Integer maxRunningRequests,
final String proxyHost,
Expand Down Expand Up @@ -159,11 +186,19 @@ private static OkHttpClient buildHttpClient(
log.debug("Using NamedPipe as http transport");
}

if (isHttp) {
if (isPlainHttp) {
// force clear text when using http to avoid failures for JVMs without TLS
builder.connectionSpecs(Collections.singletonList(ConnectionSpec.CLEARTEXT));
}

if (isHttp2) {
if (isPlainHttp) {
builder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE));
} else {
builder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1));
}
}

if (retryOnConnectionFailure != null) {
builder.retryOnConnectionFailure(retryOnConnectionFailure);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package datadog.trace.bootstrap.otel.metrics;

import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_COUNTER;
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_GAUGE;
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER;

import javax.annotation.Nullable;

public final class OtelInstrumentBuilder {
Expand Down Expand Up @@ -91,11 +95,11 @@ public OtelInstrumentDescriptor observableDescriptor() {
private OtelInstrumentType observableType(OtelInstrumentType instrumentType) {
switch (instrumentType) {
case COUNTER:
return OtelInstrumentType.OBSERVABLE_COUNTER;
return OBSERVABLE_COUNTER;
case UP_DOWN_COUNTER:
return OtelInstrumentType.OBSERVABLE_UP_DOWN_COUNTER;
return OBSERVABLE_UP_DOWN_COUNTER;
case GAUGE:
return OtelInstrumentType.OBSERVABLE_GAUGE;
return OBSERVABLE_GAUGE;
default:
throw new IllegalArgumentException(instrumentType + " has no observable equivalent");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public final class OtelInstrumentDescriptor {
private final boolean longValues;
@Nullable private final UTF8BytesString description;
@Nullable private final UTF8BytesString unit;
private int hash;

public OtelInstrumentDescriptor(
String instrumentName,
Expand Down Expand Up @@ -64,11 +65,15 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
int result = instrumentName.toString().toLowerCase(Locale.ROOT).hashCode();
result = 31 * result + instrumentType.hashCode();
result = 31 * result + Boolean.hashCode(longValues);
result = 31 * result + Objects.hashCode(description);
result = 31 * result + Objects.hashCode(unit);
int result = hash;
if (result == 0) {
result = instrumentName.toString().toLowerCase(Locale.ROOT).hashCode();
result = 31 * result + instrumentType.hashCode();
result = 31 * result + Boolean.hashCode(longValues);
result = 31 * result + Objects.hashCode(description);
result = 31 * result + Objects.hashCode(unit);
hash = result;
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package datadog.trace.bootstrap.otel.metrics.data;

import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.COUNTER;
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.HISTOGRAM;
import static datadog.trace.bootstrap.otel.metrics.OtelInstrumentType.OBSERVABLE_COUNTER;

import datadog.logging.RatelimitedLogger;
import datadog.trace.api.Config;
import datadog.trace.api.config.OtlpConfig;
Expand Down Expand Up @@ -63,12 +67,10 @@ private static boolean shouldResetOnCollect(OtelInstrumentType type) {
switch (TEMPORALITY_PREFERENCE) {
case DELTA:
// gauges and up/down counters stay as cumulative
return type == OtelInstrumentType.HISTOGRAM
|| type == OtelInstrumentType.COUNTER
|| type == OtelInstrumentType.OBSERVABLE_COUNTER;
return type == HISTOGRAM || type == COUNTER || type == OBSERVABLE_COUNTER;
case LOWMEMORY:
// observable counters, gauges, and up/down counters stay as cumulative
return type == OtelInstrumentType.HISTOGRAM || type == OtelInstrumentType.COUNTER;
return type == HISTOGRAM || type == COUNTER;
case CUMULATIVE:
default:
return false;
Expand Down
36 changes: 23 additions & 13 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import datadog.trace.core.datastreams.DefaultDataStreamsMonitoring;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.monitor.TracerHealthMetrics;
import datadog.trace.core.otlp.metrics.OtlpMetricsService;
import datadog.trace.core.propagation.ExtractedContext;
import datadog.trace.core.propagation.HttpCodec;
import datadog.trace.core.propagation.InferredProxyPropagator;
Expand Down Expand Up @@ -786,19 +787,7 @@ private CoreTracer(
// asynchronously create the aggregator to avoid triggering expensive classloading during the
// tracer initialisation.
sharedCommunicationObjects.whenReady(
() ->
AgentTaskScheduler.get()
.execute(
() -> {
metricsAggregator = createMetricsAggregator(config, sco, this.healthMetrics);
// Schedule the metrics aggregator to begin reporting after a random delay of
// 1 to 10 seconds (using milliseconds granularity.)
// This avoids a fleet of traced applications starting at the same time from
// sending metrics in sync.
AgentTaskScheduler.get()
.scheduleWithJitter(
MetricsAggregator::start, metricsAggregator, 1, SECONDS);
}));
() -> AgentTaskScheduler.get().execute(() -> startMetricsAggregation(config, sco)));

if (dataStreamsMonitoring == null) {
this.dataStreamsMonitoring =
Expand Down Expand Up @@ -904,6 +893,20 @@ private CoreTracer(
}
}

private void startMetricsAggregation(Config config, SharedCommunicationObjects sco) {
metricsAggregator = createMetricsAggregator(config, sco, this.healthMetrics);
// Schedule the metrics aggregator to begin reporting after a random delay of
// 1 to 10 seconds (using milliseconds granularity.)
// This avoids a fleet of traced applications starting at the same time from
// sending metrics in sync.
AgentTaskScheduler.get()
.scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS);

if (config.isMetricsOtlpExporterEnabled()) {
OtlpMetricsService.INSTANCE.start();
}
}

/** Used by AgentTestRunner to inject configuration into the test tracer. */
public void rebuildTraceConfig(Config config) {
dynamicConfig
Expand Down Expand Up @@ -1415,6 +1418,9 @@ public void close() {
RumInjector.shutdownTelemetry();
AgentMeter.statsDClient().close();
metricsAggregator.close();
if (initialConfig.isMetricsOtlpExporterEnabled()) {
OtlpMetricsService.INSTANCE.shutdown();
}
dataStreamsMonitoring.close();
externalAgentLauncher.close();
healthMetrics.close();
Expand Down Expand Up @@ -1450,6 +1456,10 @@ public void flushMetrics() {
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.debug("Failed to wait for metrics flush.", e);
}

if (initialConfig.isMetricsOtlpExporterEnabled()) {
OtlpMetricsService.INSTANCE.flush();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package datadog.trace.core.otlp.common;

import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.BOOLEAN;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.BOOLEAN_ARRAY;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.DOUBLE;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.DOUBLE_ARRAY;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.LONG_ARRAY;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING;
import static datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor.STRING_ARRAY;
import static java.nio.charset.StandardCharsets.UTF_8;

import datadog.communication.serialization.GenerationalUtf8Cache;
Expand All @@ -8,7 +16,6 @@
import datadog.communication.serialization.StreamingBuffer;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;
import java.nio.ByteBuffer;
import java.util.List;

Expand Down Expand Up @@ -149,28 +156,28 @@ public static void writeInstrumentationScope(
public static void writeAttribute(StreamingBuffer buf, int type, String key, Object value) {
byte[] keyUtf8 = keyUtf8(key);
switch (type) {
case OtlpAttributeVisitor.STRING:
case STRING:
writeStringAttribute(buf, keyUtf8, valueUtf8((String) value));
break;
case OtlpAttributeVisitor.BOOLEAN:
case BOOLEAN:
writeBooleanAttribute(buf, keyUtf8, (boolean) value);
break;
case OtlpAttributeVisitor.LONG:
case LONG:
writeLongAttribute(buf, keyUtf8, (long) value);
break;
case OtlpAttributeVisitor.DOUBLE:
case DOUBLE:
writeDoubleAttribute(buf, keyUtf8, (double) value);
break;
case OtlpAttributeVisitor.STRING_ARRAY:
case STRING_ARRAY:
writeStringArrayAttribute(buf, keyUtf8, (List<String>) value);
break;
case OtlpAttributeVisitor.BOOLEAN_ARRAY:
case BOOLEAN_ARRAY:
writeBooleanArrayAttribute(buf, keyUtf8, (List<Boolean>) value);
break;
case OtlpAttributeVisitor.LONG_ARRAY:
case LONG_ARRAY:
writeLongArrayAttribute(buf, keyUtf8, (List<Long>) value);
break;
case OtlpAttributeVisitor.DOUBLE_ARRAY:
case DOUBLE_ARRAY:
writeDoubleArrayAttribute(buf, keyUtf8, (List<Double>) value);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package datadog.trace.core.otlp.common;

import java.io.IOException;
import javax.annotation.Nonnull;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import okio.Buffer;
import okio.BufferedSink;
import okio.GzipSink;
import okio.Okio;

/** Wraps an {@link OtlpPayload} as a GRPC {@link RequestBody}. */
public final class OtlpGrpcRequestBody extends RequestBody {

private static final MediaType GRPC_MEDIA_TYPE = MediaType.parse("application/grpc");

private static final int HEADER_LENGTH = 5;

private static final byte UNCOMPRESSED_FLAG = 0;
private static final byte COMPRESSED_FLAG = 1;

private final OtlpPayload payload;
private final boolean gzip;

public OtlpGrpcRequestBody(OtlpPayload payload, boolean gzip) {
this.payload = payload;
this.gzip = gzip;
}

@Override
public long contentLength() {
return gzip ? -1 : HEADER_LENGTH + payload.getContentLength();
}

@Override
public MediaType contentType() {
return GRPC_MEDIA_TYPE;
}

@Override
public void writeTo(@Nonnull BufferedSink sink) throws IOException {
if (gzip) {
try (Buffer gzipBody = new Buffer()) {
try (BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipBody))) {
payload.drain(gzipSink::write);
}
sink.writeByte(COMPRESSED_FLAG);
long gzipLength = gzipBody.size();
sink.writeInt((int) gzipLength);
sink.write(gzipBody, gzipLength);
}
} else {
sink.writeByte(UNCOMPRESSED_FLAG);
sink.writeInt(payload.getContentLength());
payload.drain(sink::write);
}
}
}
Loading
Loading