Skip to content

Commit 826630f

Browse files
committed
Alternative API Security sampling algorithm
1 parent 5978655 commit 826630f

9 files changed

Lines changed: 98 additions & 247 deletions

File tree

dd-java-agent/appsec/src/main/java/com/datadog/appsec/AppSecSystem.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.datadog.appsec.blocking.BlockingServiceImpl;
66
import com.datadog.appsec.config.AppSecConfigService;
77
import com.datadog.appsec.config.AppSecConfigServiceImpl;
8+
import com.datadog.appsec.config.TraceSegmentPostProcessor;
89
import com.datadog.appsec.ddwaf.WAFModule;
910
import com.datadog.appsec.event.EventDispatcher;
1011
import com.datadog.appsec.event.ReplaceableEventProducerService;
@@ -22,7 +23,6 @@
2223
import datadog.trace.api.telemetry.ProductChange;
2324
import datadog.trace.api.telemetry.ProductChangeCollector;
2425
import datadog.trace.bootstrap.ActiveSubsystems;
25-
import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor;
2626
import java.util.Collections;
2727
import java.util.HashMap;
2828
import java.util.List;
@@ -68,18 +68,6 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
6868
EventDispatcher eventDispatcher = new EventDispatcher();
6969
REPLACEABLE_EVENT_PRODUCER.replaceEventProducerService(eventDispatcher);
7070

71-
ApiSecuritySampler requestSampler;
72-
if (Config.get().isApiSecurityEnabled()) {
73-
requestSampler = new ApiSecuritySamplerImpl();
74-
// When DD_API_SECURITY_ENABLED=true, ths post-processor is set even when AppSec is inactive.
75-
// This should be low overhead since the post-processor exits early if there's no AppSec
76-
// context.
77-
SpanPostProcessor.Holder.INSTANCE =
78-
new ApiSecurityProcessor(requestSampler, REPLACEABLE_EVENT_PRODUCER);
79-
} else {
80-
requestSampler = new ApiSecuritySampler.NoOp();
81-
}
82-
8371
ConfigurationPoller configurationPoller = sco.configurationPoller(config);
8472
// may throw and abort startup
8573
APP_SEC_CONFIG_SERVICE =
@@ -89,11 +77,15 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
8977

9078
sco.createRemaining(config);
9179

80+
TraceSegmentPostProcessor apiSecurityPostProcessor =
81+
Config.get().isApiSecurityEnabled()
82+
? new ApiSecurityProcessor(new ApiSecuritySampler(), REPLACEABLE_EVENT_PRODUCER)
83+
: null;
9284
GatewayBridge gatewayBridge =
9385
new GatewayBridge(
9486
gw,
9587
REPLACEABLE_EVENT_PRODUCER,
96-
requestSampler,
88+
apiSecurityPostProcessor,
9789
APP_SEC_CONFIG_SERVICE.getTraceSegmentPostProcessors());
9890

9991
loadModules(eventDispatcher, sco.monitoring);

dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecurityProcessor.java

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
11
package com.datadog.appsec.api.security;
22

3+
import com.datadog.appsec.config.TraceSegmentPostProcessor;
34
import com.datadog.appsec.event.EventProducerService;
45
import com.datadog.appsec.event.ExpiredSubscriberInfoException;
56
import com.datadog.appsec.event.data.DataBundle;
67
import com.datadog.appsec.event.data.KnownAddresses;
78
import com.datadog.appsec.event.data.SingletonDataBundle;
89
import com.datadog.appsec.gateway.AppSecRequestContext;
910
import com.datadog.appsec.gateway.GatewayContext;
10-
import datadog.trace.api.gateway.RequestContext;
11-
import datadog.trace.api.gateway.RequestContextSlot;
11+
import com.datadog.appsec.report.AppSecEvent;
12+
import datadog.trace.api.Config;
13+
import datadog.trace.api.ProductTraceSource;
1214
import datadog.trace.api.internal.TraceSegment;
13-
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
14-
15+
import datadog.trace.bootstrap.instrumentation.api.Tags;
16+
import java.util.Collection;
1517
import java.util.Collections;
1618
import javax.annotation.Nonnull;
1719
import org.slf4j.Logger;
1820
import org.slf4j.LoggerFactory;
1921

20-
public class ApiSecurityProcessor {
22+
public class ApiSecurityProcessor implements TraceSegmentPostProcessor {
2123

2224
private static final Logger log = LoggerFactory.getLogger(ApiSecurityProcessor.class);
2325
private final ApiSecuritySampler sampler;
@@ -28,39 +30,22 @@ public ApiSecurityProcessor(ApiSecuritySampler sampler, EventProducerService pro
2830
this.producerService = producerService;
2931
}
3032

31-
public void process(@Nonnull AgentSpan span) {
32-
final RequestContext ctx_ = span.getRequestContext();
33-
if (ctx_ == null) {
33+
@Override
34+
public void processTraceSegment(
35+
TraceSegment segment, AppSecRequestContext ctx, Collection<AppSecEvent> collectedEvents) {
36+
if (segment == null || ctx == null) {
3437
return;
3538
}
36-
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
37-
if (ctx == null) {
39+
if (!sampler.sample(ctx)) {
40+
log.debug("Request not sampled, skipping API security post-processing");
3841
return;
3942
}
40-
41-
try {
42-
if (!sampler.sample(ctx)) {
43-
log.debug("Request not sampled, skipping API security post-processing");
44-
return;
45-
}
46-
log.debug("Request sampled, processing API security post-processing");
47-
extractSchemas(ctx, ctx_.getTraceSegment());
48-
} finally {
49-
ctx.setKeepOpenForApiSecurityPostProcessing(false);
50-
try {
51-
// XXX: Close the additive first. This is not strictly needed, but it'll prevent getting it
52-
// detected as a
53-
// missed request-ended event.
54-
ctx.closeWafContext();
55-
ctx.close();
56-
} catch (Exception e) {
57-
log.debug("Error closing AppSecRequestContext", e);
58-
}
59-
sampler.releaseOne();
60-
}
43+
log.debug("Request sampled, processing API security post-processing");
44+
extractSchemas(ctx, segment);
6145
}
6246

63-
private void extractSchemas(final AppSecRequestContext ctx, final TraceSegment traceSegment) {
47+
private void extractSchemas(
48+
final @Nonnull AppSecRequestContext ctx, final @Nonnull TraceSegment traceSegment) {
6449
final EventProducerService.DataSubscriberInfo sub =
6550
producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR);
6651
if (sub == null || sub.isEmpty()) {
@@ -74,7 +59,12 @@ private void extractSchemas(final AppSecRequestContext ctx, final TraceSegment t
7459
try {
7560
GatewayContext gwCtx = new GatewayContext(false);
7661
producerService.publishDataEvent(sub, ctx, bundle, gwCtx);
77-
ctx.commitDerivatives(traceSegment);
62+
// TODO: Perhaps do this if schemas have actually been extracted (check when committing
63+
// derivatives)
64+
traceSegment.setTagTop(Tags.ASM_KEEP, true);
65+
if (!Config.get().isApmTracingEnabled()) {
66+
traceSegment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM);
67+
}
7868
} catch (ExpiredSubscriberInfoException e) {
7969
log.debug("Subscriber info expired", e);
8070
}

dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecuritySampler.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.datadog.appsec.gateway.AppSecRequestContext;
44
import datadog.trace.util.AgentTaskScheduler;
5-
65
import java.util.Random;
76
import java.util.concurrent.Executor;
87
import java.util.concurrent.atomic.AtomicBoolean;
@@ -11,10 +10,10 @@
1110
import java.util.concurrent.atomic.AtomicReference;
1211

1312
/**
14-
* Internal map for API Security sampling.
15-
* See "[RFC-1021] API Security Sampling Algorithm for thread-based concurrency".
13+
* Internal map for API Security sampling. See "[RFC-1021] API Security Sampling Algorithm for
14+
* thread-based concurrency".
1615
*/
17-
final public class ApiSecuritySampler {
16+
public class ApiSecuritySampler {
1817

1918
private static final int DEFAULT_MAX_ITEM_COUNT = 4096;
2019
private static final int DEFAULT_INTERVAL_SECONDS = 30;
@@ -28,10 +27,20 @@ final public class ApiSecuritySampler {
2827
private final long maxItemCount;
2928

3029
public ApiSecuritySampler() {
31-
this(DEFAULT_MAX_ITEM_COUNT, DEFAULT_INTERVAL_SECONDS, new Random().nextLong(), new DefaultMonotonicClock(), AgentTaskScheduler.INSTANCE);
30+
this(
31+
DEFAULT_MAX_ITEM_COUNT,
32+
DEFAULT_INTERVAL_SECONDS,
33+
new Random().nextLong(),
34+
new DefaultMonotonicClock(),
35+
AgentTaskScheduler.INSTANCE);
3236
}
3337

34-
public ApiSecuritySampler(final int maxItemCount, final int intervalSeconds, final long zero, final MonotonicClock clock, Executor executor) {
38+
public ApiSecuritySampler(
39+
final int maxItemCount,
40+
final int intervalSeconds,
41+
final long zero,
42+
final MonotonicClock clock,
43+
Executor executor) {
3544
table = new AtomicReference<>(new Table(maxItemCount));
3645
this.maxItemCount = maxItemCount;
3746
this.intervalSeconds = intervalSeconds;
@@ -154,7 +163,7 @@ public FindSlotResult findSlot(final long key) {
154163
index = 0;
155164
}
156165
} while (index != startIndex);
157-
return new FindSlotResult(table[(int)(maxItemCount * 2)], false);
166+
return new FindSlotResult(table[(int) (maxItemCount * 2)], false);
158167
}
159168

160169
static class FindSlotResult {

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java

Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,6 @@ public class AppSecRequestContext implements DataBundle, Closeable {
142142
// Used to detect missing request-end event at close.
143143
private volatile boolean requestEndCalled;
144144

145-
private volatile boolean keepOpenForApiSecurityPostProcessing;
146-
private volatile Long apiSecurityEndpointHash;
147-
148145
private static final AtomicIntegerFieldUpdater<AppSecRequestContext> WAF_TIMEOUTS_UPDATER =
149146
AtomicIntegerFieldUpdater.newUpdater(AppSecRequestContext.class, "wafTimeouts");
150147
private static final AtomicIntegerFieldUpdater<AppSecRequestContext> RASP_TIMEOUTS_UPDATER =
@@ -343,22 +340,6 @@ public void setRoute(String route) {
343340
this.route = route;
344341
}
345342

346-
public void setKeepOpenForApiSecurityPostProcessing(final boolean flag) {
347-
this.keepOpenForApiSecurityPostProcessing = flag;
348-
}
349-
350-
public boolean isKeepOpenForApiSecurityPostProcessing() {
351-
return this.keepOpenForApiSecurityPostProcessing;
352-
}
353-
354-
public void setApiSecurityEndpointHash(long hash) {
355-
this.apiSecurityEndpointHash = hash;
356-
}
357-
358-
public Long getApiSecurityEndpointHash() {
359-
return this.apiSecurityEndpointHash;
360-
}
361-
362343
void addRequestHeader(String name, String value) {
363344
if (finishedRequestHeaders) {
364345
throw new IllegalStateException("Request headers were said to be finished before");
@@ -554,23 +535,18 @@ public void close() {
554535
if (!requestEndCalled) {
555536
log.debug(SEND_TELEMETRY, "Request end event was not called before close");
556537
}
557-
// For API Security, we sometimes keep contexts open for late processing. In that case, this
558-
// flag needs to be
559-
// later reset by the API Security post-processor and close must be called again.
560-
if (!keepOpenForApiSecurityPostProcessing) {
561-
if (wafContext != null) {
562-
log.debug(
563-
SEND_TELEMETRY, "WAF object had not been closed (probably missed request-end event)");
564-
closeWafContext();
565-
}
566-
collectedCookies = null;
567-
requestHeaders.clear();
568-
responseHeaders.clear();
569-
persistentData.clear();
570-
if (derivatives != null) {
571-
derivatives.clear();
572-
derivatives = null;
573-
}
538+
if (wafContext != null) {
539+
log.debug(
540+
SEND_TELEMETRY, "WAF object had not been closed (probably missed request-end event)");
541+
closeWafContext();
542+
}
543+
collectedCookies = null;
544+
requestHeaders.clear();
545+
responseHeaders.clear();
546+
persistentData.clear();
547+
if (derivatives != null) {
548+
derivatives.clear();
549+
derivatives = null;
574550
}
575551
}
576552

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public class GatewayBridge {
8888

8989
private final SubscriptionService subscriptionService;
9090
private final EventProducerService producerService;
91-
private final ApiSecuritySampler requestSampler;
91+
private final TraceSegmentPostProcessor apiSecurityPostProcessor;
9292
private final List<TraceSegmentPostProcessor> traceSegmentPostProcessors;
9393

9494
// subscriber cache
@@ -114,11 +114,11 @@ public class GatewayBridge {
114114
public GatewayBridge(
115115
SubscriptionService subscriptionService,
116116
EventProducerService producerService,
117-
ApiSecuritySampler requestSampler,
117+
TraceSegmentPostProcessor apiSecurityPostProcessor,
118118
List<TraceSegmentPostProcessor> traceSegmentPostProcessors) {
119119
this.subscriptionService = subscriptionService;
120120
this.producerService = producerService;
121-
this.requestSampler = requestSampler;
121+
this.apiSecurityPostProcessor = apiSecurityPostProcessor;
122122
this.traceSegmentPostProcessors = traceSegmentPostProcessors;
123123
}
124124

@@ -679,22 +679,20 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
679679
TraceSegment traceSeg = ctx_.getTraceSegment();
680680
Map<String, Object> tags = spanInfo.getTags();
681681

682-
if (maybeSampleForApiSecurity(ctx, spanInfo, tags)) {
683-
if (!Config.get().isApmTracingEnabled()) {
684-
traceSeg.setTagTop(Tags.ASM_KEEP, true);
685-
traceSeg.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM);
686-
}
687-
} else {
688-
ctx.closeWafContext();
689-
}
690-
691682
// AppSec report metric and events for web span only
692683
if (traceSeg != null) {
693684
traceSeg.setTagTop("_dd.appsec.enabled", 1);
694685
traceSeg.setTagTop("_dd.runtime_family", "jvm");
695686

696687
Collection<AppSecEvent> collectedEvents = ctx.transferCollectedEvents();
697688

689+
final Object route = tags.get(Tags.HTTP_ROUTE);
690+
if (route != null) {
691+
ctx.setRoute(route.toString());
692+
}
693+
// TODO: Move this to traceSegmentPostProcessors
694+
apiSecurityPostProcessor.processTraceSegment(traceSeg, ctx, null);
695+
698696
for (TraceSegmentPostProcessor pp : this.traceSegmentPostProcessors) {
699697
pp.processTraceSegment(traceSeg, ctx, collectedEvents);
700698
}
@@ -748,6 +746,7 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
748746
writeRequestHeaders(
749747
traceSeg, DEFAULT_REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders(), false);
750748
}
749+
751750
// If extracted any derivatives - commit them
752751
if (!ctx.commitDerivatives(traceSeg)) {
753752
log.debug("Unable to commit, derivatives will be skipped {}", ctx.getDerivativeKeys());
@@ -765,21 +764,11 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
765764
);
766765
}
767766

767+
ctx.closeWafContext();
768768
ctx.close();
769769
return NoopFlow.INSTANCE;
770770
}
771771

772-
private boolean maybeSampleForApiSecurity(
773-
AppSecRequestContext ctx, IGSpanInfo spanInfo, Map<String, Object> tags) {
774-
log.debug("Checking API Security for end of request handler on span: {}", spanInfo.getSpanId());
775-
// API Security sampling requires http.route tag.
776-
final Object route = tags.get(Tags.HTTP_ROUTE);
777-
if (route != null) {
778-
ctx.setRoute(route.toString());
779-
}
780-
return requestSampler.preSampleRequest(ctx);
781-
}
782-
783772
private Flow<Void> onRequestHeadersDone(RequestContext ctx_) {
784773
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
785774
if (ctx == null || ctx.isReqDataPublished()) {

0 commit comments

Comments
 (0)