Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2246,26 +2246,20 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
def "test rum injection in head for mime #mime"() {
setup:
assumeTrue(testRumInjection())
def telemetryCollector = RumInjector.getTelemetryCollector()
def request = new Request.Builder().url(server.address().resolve("gimme-$mime").toURL())
.get().build()

when:
def response = client.newCall(request).execute()
def responseBody = response.body().string()
def finalSummary = telemetryCollector.summary()

then:
assert response.code() == 200
assert responseBody.contains(new String(RumInjector.get().getSnippetBytes("UTF-8"), "UTF-8")) == expected
assert response.header("x-datadog-rum-injected") == (expected ? "1" : null)

// Check a few telemetry metrics
if (expected) {
assert finalSummary.contains("injectionSucceed=")
assert responseBody.length() > 0
} else {
assert finalSummary.contains("injectionSkipped=")
}

where:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ private CoreTracer(

// Start RUM injector telemetry
if (InstrumenterConfig.get().isRumEnabled()) {
RumInjector.enableTelemetry(this.statsDClient);
RumInjector.enableTelemetry();
Comment thread
sarahchen6 marked this conversation as resolved.
Outdated
}

performanceMonitoring =
Expand Down
30 changes: 18 additions & 12 deletions internal-api/src/main/java/datadog/trace/api/rum/RumInjector.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,12 @@ public byte[] getMarkerBytes(String encoding) {
return this.markerCache.computeIfAbsent(encoding, MARKER_BYTES);
}

/**
* Starts telemetry collection and reports metrics via StatsDClient.
*
* @param statsDClient The StatsDClient to report metrics to.
*/
public static void enableTelemetry(datadog.trace.api.StatsDClient statsDClient) {
if (statsDClient != null) {
RumInjectorMetrics metrics = new RumInjectorMetrics(statsDClient);
/** Starts telemetry collection if RUM injection is enabled. */
public static void enableTelemetry() {
if (INSTANCE.isEnabled()) {
RumInjectorMetrics metrics = new RumInjectorMetrics();
Comment thread
sarahchen6 marked this conversation as resolved.
Outdated
telemetryCollector = metrics;

if (INSTANCE.isEnabled()) {
telemetryCollector.onInitializationSucceed();
}
telemetryCollector.onInitializationSucceed();
} else {
telemetryCollector = RumTelemetryCollector.NO_OP;
}
Expand Down Expand Up @@ -166,4 +159,17 @@ public static void setTelemetryCollector(RumTelemetryCollector collector) {
public static RumTelemetryCollector getTelemetryCollector() {
return telemetryCollector;
}

/**
* Gets the concrete RumInjectorMetrics instance.
*
* @return The RumInjectorMetrics instance or null if telemetry is NO_OP.
*/
public static RumInjectorMetrics getMetricsInstance() {
RumTelemetryCollector collector = telemetryCollector;
if (collector instanceof RumInjectorMetrics) {
return (RumInjectorMetrics) collector;
}
return null;
}
Comment thread
sarahchen6 marked this conversation as resolved.
Outdated
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,33 @@
package datadog.trace.api.rum;

import datadog.trace.api.Config;
import datadog.trace.api.StatsDClient;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import java.util.concurrent.atomic.AtomicLong;
import datadog.trace.api.telemetry.MetricCollector;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* This class implements the RumTelemetryCollector interface, which is used to collect telemetry
* from the RumInjector. Metrics are then reported via StatsDClient with tagging.
* from the RumInjector. Metrics are then reported via the Datadog telemetry intake system.
*
* @see <a
* href="https://github.com/DataDog/dd-go/blob/prod/trace/apps/tracer-telemetry-intake/telemetry-metrics/static/common_metrics.json">common
* metrics and tags</a>
*/
public class RumInjectorMetrics implements RumTelemetryCollector {

private final AtomicLong injectionSucceed = new AtomicLong();
private final AtomicLong injectionFailed = new AtomicLong();
private final AtomicLong injectionSkipped = new AtomicLong();
private final AtomicLong contentSecurityPolicyDetected = new AtomicLong();
private final AtomicLong initializationSucceed = new AtomicLong();

private final StatsDClient statsd;
private final Queue<MetricCollector.Metric> metrics = new LinkedBlockingQueue<>();
private final Queue<MetricCollector.DistributionSeriesPoint> distributions =
new LinkedBlockingQueue<>();
Comment thread
sarahchen6 marked this conversation as resolved.
Outdated

private final String applicationId;
private final String remoteConfigUsed;

// Cache dependent on servlet version and content encoding
private final DDCache<String, String[]> succeedTagsCache = DDCaches.newFixedSizeCache(8);
private final DDCache<String, String[]> skippedTagsCache = DDCaches.newFixedSizeCache(8);
private final DDCache<String, String[]> cspTagsCache = DDCaches.newFixedSizeCache(8);
private final DDCache<String, String[]> responseTagsCache = DDCaches.newFixedSizeCache(8);
private final DDCache<String, String[]> timeTagsCache = DDCaches.newFixedSizeCache(8);
private final DDCache<String, String[]> failedTagsCache = DDCaches.newFixedSizeCache(16);

private static final String[] INIT_TAGS =
new String[] {"integration_name:servlet", "integration_version:N/A"};

public RumInjectorMetrics(final StatsDClient statsd) {
this.statsd = statsd;
public RumInjectorMetrics() {

// Get RUM config values (applicationId and remoteConfigUsed) for tagging
RumInjector rumInjector = RumInjector.get();
Expand All @@ -55,146 +43,141 @@ public RumInjectorMetrics(final StatsDClient statsd) {

@Override
public void onInjectionSucceed(String servletVersion) {
injectionSucceed.incrementAndGet();

String[] tags =
succeedTagsCache.computeIfAbsent(
servletVersion,
version ->
new String[] {
"application_id:" + applicationId,
"integration_name:servlet",
"integration_version:" + version,
"remote_config_used:" + remoteConfigUsed
});

statsd.count("rum.injection.succeed", 1, tags);
List<String> tags =
Arrays.asList(
"application_id:" + applicationId,
"integration_name:servlet",
"integration_version:" + servletVersion,
"remote_config_used:" + remoteConfigUsed);

MetricCollector.Metric metric =
new MetricCollector.Metric("rum", true, "injection.succeed", "count", 1, tags);
metrics.offer(metric);
}

@Override
public void onInjectionFailed(String servletVersion, String contentEncoding) {
injectionFailed.incrementAndGet();

String cacheKey = servletVersion + ":" + contentEncoding;
String[] tags =
failedTagsCache.computeIfAbsent(
cacheKey,
key -> {
if (contentEncoding != null) {
return new String[] {
"application_id:" + applicationId,
"content_encoding:" + contentEncoding,
"integration_name:servlet",
"integration_version:" + servletVersion,
"reason:failed_to_return_response_wrapper",
"remote_config_used:" + remoteConfigUsed
};
} else {
return new String[] {
"application_id:" + applicationId,
"integration_name:servlet",
"integration_version:" + servletVersion,
"reason:failed_to_return_response_wrapper",
"remote_config_used:" + remoteConfigUsed
};
}
});

statsd.count("rum.injection.failed", 1, tags);
List<String> tags = new ArrayList<>();
tags.add("application_id:" + applicationId);
if (contentEncoding != null) {
tags.add("content_encoding:" + contentEncoding);
}
tags.add("integration_name:servlet");
tags.add("integration_version:" + servletVersion);
tags.add("reason:failed_to_return_response_wrapper");
tags.add("remote_config_used:" + remoteConfigUsed);

MetricCollector.Metric metric =
new MetricCollector.Metric("rum", true, "injection.failed", "count", 1, tags);
metrics.offer(metric);
}

@Override
public void onInjectionSkipped(String servletVersion) {
injectionSkipped.incrementAndGet();

String[] tags =
skippedTagsCache.computeIfAbsent(
servletVersion,
version ->
new String[] {
"application_id:" + applicationId,
"integration_name:servlet",
"integration_version:" + version,
"reason:should_not_inject",
"remote_config_used:" + remoteConfigUsed
});

statsd.count("rum.injection.skipped", 1, tags);
List<String> tags =
Comment thread
sarahchen6 marked this conversation as resolved.
Arrays.asList(
"application_id:" + applicationId,
"integration_name:servlet",
"integration_version:" + servletVersion,
"reason:should_not_inject",
"remote_config_used:" + remoteConfigUsed);

MetricCollector.Metric metric =
new MetricCollector.Metric("rum", true, "injection.skipped", "count", 1, tags);
metrics.offer(metric);
}

@Override
public void onInitializationSucceed() {
initializationSucceed.incrementAndGet();
statsd.count("rum.injection.initialization.succeed", 1, INIT_TAGS);
List<String> tags = Arrays.asList("integration_name:servlet", "integration_version:N/A");

MetricCollector.Metric metric =
new MetricCollector.Metric(
"rum", true, "injection.initialization.succeed", "count", 1, tags);
metrics.offer(metric);
}

@Override
public void onContentSecurityPolicyDetected(String servletVersion) {
contentSecurityPolicyDetected.incrementAndGet();

String[] tags =
cspTagsCache.computeIfAbsent(
servletVersion,
version ->
new String[] {
"integration_name:servlet",
"integration_version:" + version,
"kind:header",
"reason:csp_header_found",
"status:seen"
});
statsd.count("rum.injection.content_security_policy", 1, tags);
List<String> tags =
Arrays.asList(
"integration_name:servlet",
"integration_version:" + servletVersion,
"kind:header",
"reason:csp_header_found",
"status:seen");

MetricCollector.Metric metric =
new MetricCollector.Metric(
"rum", true, "injection.content_security_policy", "count", 1, tags);
metrics.offer(metric);
}

@Override
public void onInjectionResponseSize(String servletVersion, long bytes) {
String[] tags =
responseTagsCache.computeIfAbsent(
servletVersion,
version ->
new String[] {
"integration_name:servlet",
"integration_version:" + version,
"response_kind:header"
});
statsd.distribution("rum.injection.response.bytes", bytes, tags);
List<String> tags =
Arrays.asList(
"integration_name:servlet",
"integration_version:" + servletVersion,
"response_kind:header");

MetricCollector.DistributionSeriesPoint distribution =
new MetricCollector.DistributionSeriesPoint(
"injection.response.bytes", true, "rum", (int) bytes, tags);
distributions.offer(distribution);
}

@Override
public void onInjectionTime(String servletVersion, long milliseconds) {
String[] tags =
timeTagsCache.computeIfAbsent(
servletVersion,
version -> new String[] {"integration_name:servlet", "integration_version:" + version});
statsd.distribution("rum.injection.ms", milliseconds, tags);
List<String> tags =
Arrays.asList("integration_name:servlet", "integration_version:" + servletVersion);

MetricCollector.DistributionSeriesPoint distribution =
new MetricCollector.DistributionSeriesPoint(
"injection.ms", true, "rum", (int) milliseconds, tags);
distributions.offer(distribution);
}

@Override
public void close() {
injectionSucceed.set(0);
injectionFailed.set(0);
injectionSkipped.set(0);
contentSecurityPolicyDetected.set(0);
initializationSucceed.set(0);

succeedTagsCache.clear();
skippedTagsCache.clear();
cspTagsCache.clear();
responseTagsCache.clear();
timeTagsCache.clear();
failedTagsCache.clear();
metrics.clear();
distributions.clear();
}

/**
* Drains all count metrics.
*
* @return Collection of metrics sent via telemetry
*/
public synchronized Collection<MetricCollector.Metric> drain() {
if (metrics.isEmpty()) {
return Collections.emptyList();
}

List<MetricCollector.Metric> drained = new ArrayList<>(metrics.size());
MetricCollector.Metric metric;
while ((metric = metrics.poll()) != null) {
drained.add(metric);
}
Comment thread
sarahchen6 marked this conversation as resolved.
Outdated
return drained;
}

public String summary() {
return "\ninitializationSucceed="
+ initializationSucceed.get()
+ "\ninjectionSucceed="
+ injectionSucceed.get()
+ "\ninjectionFailed="
+ injectionFailed.get()
+ "\ninjectionSkipped="
+ injectionSkipped.get()
+ "\ncontentSecurityPolicyDetected="
+ contentSecurityPolicyDetected.get();
/**
* Drains all distribution metrics.
*
* @return Collection of distribution points sent via telemetry
*/
public synchronized Collection<MetricCollector.DistributionSeriesPoint>
drainDistributionSeries() {
if (distributions.isEmpty()) {
return Collections.emptyList();
}

List<MetricCollector.DistributionSeriesPoint> drained = new ArrayList<>(distributions.size());
MetricCollector.DistributionSeriesPoint distribution;
while ((distribution = distributions.poll()) != null) {
drained.add(distribution);
}
Comment thread
sarahchen6 marked this conversation as resolved.
Outdated
return drained;
}
}
Loading