Skip to content

Commit f68f9b9

Browse files
feat: add capped sampling rate increases (#10715)
Add capped sampling rate increases fix update after review Merge branch 'master' into raphael/sampling_capped_increase Merge branch 'master' into raphael/sampling_capped_increase Co-authored-by: raphael.gavache <raphael.gavache@datadoghq.com>
1 parent 084e3e0 commit f68f9b9

File tree

2 files changed

+226
-1
lines changed

2 files changed

+226
-1
lines changed

dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import datadog.trace.api.cache.DDCaches;
55
import datadog.trace.api.sampling.PrioritySampling;
66
import datadog.trace.api.sampling.SamplingMechanism;
7+
import datadog.trace.api.time.SystemTimeSource;
8+
import datadog.trace.api.time.TimeSource;
79
import datadog.trace.common.writer.RemoteResponseListener;
810
import datadog.trace.core.CoreSpan;
911
import java.util.Collections;
@@ -24,8 +26,21 @@ public class RateByServiceTraceSampler implements Sampler, PrioritySampler, Remo
2426
public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr";
2527

2628
private static final double DEFAULT_RATE = 1.0;
29+
private static final double MAX_RATE_INCREASE_FACTOR = 2.0;
30+
static final long RAMP_UP_INTERVAL_NANOS = 1_000_000_000L;
31+
32+
private final TimeSource timeSource;
2733

2834
private volatile RateSamplersByEnvAndService serviceRates = new RateSamplersByEnvAndService();
35+
private long lastCappedNanos;
36+
37+
public RateByServiceTraceSampler() {
38+
this(SystemTimeSource.INSTANCE);
39+
}
40+
41+
RateByServiceTraceSampler(TimeSource timeSource) {
42+
this.timeSource = timeSource;
43+
}
2944

3045
@Override
3146
public <T extends CoreSpan<T>> boolean sample(final T span) {
@@ -62,6 +77,14 @@ private <T extends CoreSpan<T>> String getSpanEnv(final T span) {
6277
return span.getTag("env", "");
6378
}
6479

80+
static boolean shouldCap(double oldRate, double newRate) {
81+
return oldRate != 0 && newRate > oldRate * MAX_RATE_INCREASE_FACTOR;
82+
}
83+
84+
static double cappedRate(double oldRate) {
85+
return oldRate * MAX_RATE_INCREASE_FACTOR;
86+
}
87+
6588
@Override
6689
public void onResponse(
6790
final String endpoint, final Map<String, Map<String, Number>> responseJson) {
@@ -72,6 +95,13 @@ public void onResponse(
7295
}
7396

7497
log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson);
98+
99+
final RateSamplersByEnvAndService currentSnapshot = serviceRates;
100+
final long now = timeSource.getNanoTicks();
101+
final boolean canIncrease =
102+
lastCappedNanos == 0 || (now - lastCappedNanos) >= RAMP_UP_INTERVAL_NANOS;
103+
boolean anyCapped = false;
104+
75105
final TreeMap<String, TreeMap<String, RateSampler>> updatedEnvServiceRates =
76106
new TreeMap<>(String::compareToIgnoreCase);
77107

@@ -84,17 +114,42 @@ public void onResponse(
84114

85115
EnvAndService envAndService = EnvAndService.fromString(entry.getKey());
86116
if (envAndService.isFallback()) {
117+
double oldRate = currentSnapshot.getFallbackSampler().getSampleRate();
118+
if (shouldCap(oldRate, rate)) {
119+
if (canIncrease) {
120+
rate = cappedRate(oldRate);
121+
anyCapped = true;
122+
} else {
123+
rate = oldRate;
124+
}
125+
}
87126
fallbackSampler = RateByServiceTraceSampler.createRateSampler(rate);
88127
} else {
128+
double oldRate =
129+
currentSnapshot
130+
.getSampler(envAndService.lowerEnv, envAndService.lowerService)
131+
.getSampleRate();
132+
if (shouldCap(oldRate, rate)) {
133+
if (canIncrease) {
134+
rate = cappedRate(oldRate);
135+
anyCapped = true;
136+
} else {
137+
rate = oldRate;
138+
}
139+
}
140+
final double effectiveRate = rate;
89141
Map<String, RateSampler> serviceRates =
90142
updatedEnvServiceRates.computeIfAbsent(
91143
envAndService.lowerEnv, env -> new TreeMap<>(String::compareToIgnoreCase));
92144

93145
serviceRates.computeIfAbsent(
94146
envAndService.lowerService,
95-
service -> RateByServiceTraceSampler.createRateSampler(rate));
147+
service -> RateByServiceTraceSampler.createRateSampler(effectiveRate));
96148
}
97149
}
150+
if (canIncrease && anyCapped) {
151+
lastCappedNanos = now;
152+
}
98153
serviceRates = new RateSamplersByEnvAndService(updatedEnvServiceRates, fallbackSampler);
99154
}
100155

@@ -128,6 +183,10 @@ private static final class RateSamplersByEnvAndService {
128183
this.fallbackSampler = fallbackSampler;
129184
}
130185

186+
RateSampler getFallbackSampler() {
187+
return fallbackSampler;
188+
}
189+
131190
// used in tests only
132191
RateSampler getSampler(EnvAndService envAndService) {
133192
return getSampler(envAndService.lowerEnv, envAndService.lowerService);

dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package datadog.trace.common.sampling
22

33
import datadog.trace.api.DDTags
44
import datadog.trace.api.sampling.PrioritySampling
5+
import datadog.trace.api.time.ControllableTimeSource
56
import datadog.trace.common.writer.ListWriter
67
import datadog.trace.common.writer.LoggingWriter
78
import datadog.trace.common.writer.ddagent.DDAgentApi
@@ -222,6 +223,171 @@ class RateByServiceTraceSamplerTest extends DDCoreSpecification {
222223
'manual.keep' | true | PrioritySampling.USER_KEEP
223224
}
224225

226+
def "shouldCap returns false when rate decreases or stays same"() {
227+
expect:
228+
!RateByServiceTraceSampler.shouldCap(0.8, 0.4)
229+
!RateByServiceTraceSampler.shouldCap(0.5, 0.5)
230+
!RateByServiceTraceSampler.shouldCap(0.5, 1.0) // 1.0 <= 0.5 * 2, no cap needed
231+
}
232+
233+
def "shouldCap returns false when old rate is zero"() {
234+
expect:
235+
!RateByServiceTraceSampler.shouldCap(0.0, 0.5)
236+
!RateByServiceTraceSampler.shouldCap(0.0, 1.0)
237+
}
238+
239+
def "shouldCap returns true when new rate exceeds 2x old rate"() {
240+
expect:
241+
RateByServiceTraceSampler.shouldCap(0.1, 1.0)
242+
RateByServiceTraceSampler.shouldCap(0.2, 0.8)
243+
RateByServiceTraceSampler.shouldCap(0.1, 0.3)
244+
}
245+
246+
def "cappedRate returns 2x old rate"() {
247+
expect:
248+
RateByServiceTraceSampler.cappedRate(0.1) == 0.2
249+
RateByServiceTraceSampler.cappedRate(0.2) == 0.4
250+
RateByServiceTraceSampler.cappedRate(0.4) == 0.8
251+
}
252+
253+
def "ramp-up caps rate increases at 2x per interval"() {
254+
setup:
255+
def time = new ControllableTimeSource()
256+
time.set(1_000_000_000L)
257+
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler(time)
258+
def tolerance = 0.01
259+
260+
// Set initial rate to 0.1
261+
String response = '{"rate_by_service": {"service:foo,env:bar":0.1, "service:,env:":0.1}}'
262+
serviceSampler.onResponse("traces", serializer.fromJson(response))
263+
264+
expect:
265+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.1) < tolerance
266+
267+
when: "agent restart sends rate 1.0, first interval"
268+
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
269+
response = '{"rate_by_service": {"service:foo,env:bar":1.0, "service:,env:":1.0}}'
270+
serviceSampler.onResponse("traces", serializer.fromJson(response))
271+
272+
then: "rate is capped at 2x = 0.2"
273+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
274+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance
275+
276+
when: "second interval"
277+
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
278+
serviceSampler.onResponse("traces", serializer.fromJson(response))
279+
280+
then: "rate doubles to 0.4"
281+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance
282+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.4) < tolerance
283+
284+
when: "third interval"
285+
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
286+
serviceSampler.onResponse("traces", serializer.fromJson(response))
287+
288+
then: "rate doubles to 0.8"
289+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.8) < tolerance
290+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.8) < tolerance
291+
292+
when: "fourth interval"
293+
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
294+
serviceSampler.onResponse("traces", serializer.fromJson(response))
295+
296+
then: "rate reaches target 1.0 (2x=1.6 > 1.0)"
297+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 1.0) < tolerance
298+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 1.0) < tolerance
299+
}
300+
301+
def "ramp-down applies immediately"() {
302+
setup:
303+
def time = new ControllableTimeSource()
304+
time.set(1_000_000_000L)
305+
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler(time)
306+
def tolerance = 0.01
307+
308+
// Set initial rate to 0.8
309+
String response = '{"rate_by_service": {"service:foo,env:bar":0.8, "service:,env:":0.8}}'
310+
serviceSampler.onResponse("traces", serializer.fromJson(response))
311+
312+
when: "rate decreases to 0.2"
313+
response = '{"rate_by_service": {"service:foo,env:bar":0.2, "service:,env:":0.2}}'
314+
serviceSampler.onResponse("traces", serializer.fromJson(response))
315+
316+
then: "decrease is applied immediately"
317+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
318+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance
319+
}
320+
321+
def "rate increase blocked during cooldown"() {
322+
setup:
323+
def time = new ControllableTimeSource()
324+
time.set(1_000_000_000L)
325+
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler(time)
326+
def tolerance = 0.01
327+
328+
// Set initial rate to 0.1
329+
String response = '{"rate_by_service": {"service:foo,env:bar":0.1}}'
330+
serviceSampler.onResponse("traces", serializer.fromJson(response))
331+
332+
when: "rate jumps, first capped increase"
333+
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
334+
response = '{"rate_by_service": {"service:foo,env:bar":1.0}}'
335+
serviceSampler.onResponse("traces", serializer.fromJson(response))
336+
337+
then: "capped to 0.2"
338+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
339+
340+
when: "try again immediately (within cooldown)"
341+
serviceSampler.onResponse("traces", serializer.fromJson(response))
342+
343+
then: "rate stays at 0.2 because cooldown hasn't elapsed"
344+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
345+
346+
when: "after cooldown elapsed"
347+
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
348+
serviceSampler.onResponse("traces", serializer.fromJson(response))
349+
350+
then: "rate doubles to 0.4"
351+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance
352+
}
353+
354+
def "cooldown not reset by blocked increase"() {
355+
setup:
356+
def time = new ControllableTimeSource()
357+
time.set(1_000_000_000L)
358+
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler(time)
359+
def tolerance = 0.01
360+
361+
// Set initial low rate
362+
String response = '{"rate_by_service": {"service:foo,env:bar":0.01}}'
363+
serviceSampler.onResponse("traces", serializer.fromJson(response))
364+
365+
expect:
366+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.01) < tolerance
367+
368+
when: "wait for cooldown, apply increase: 0.01 -> 0.02"
369+
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
370+
response = '{"rate_by_service": {"service:foo,env:bar":1.0}}'
371+
serviceSampler.onResponse("traces", serializer.fromJson(response))
372+
373+
then: "rate is capped at 2x = 0.02"
374+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance
375+
376+
when: "before cooldown elapses, send another increase - rate should be held and lastCapped NOT reset"
377+
time.advance((long) (RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2))
378+
serviceSampler.onResponse("traces", serializer.fromJson(response))
379+
380+
then: "rate stays at 0.02 (cooldown)"
381+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance
382+
383+
when: "wait remaining half of cooldown from the original cap - should allow next ramp-up"
384+
time.advance((long) (RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2))
385+
serviceSampler.onResponse("traces", serializer.fromJson(response))
386+
387+
then: "rate doubles to 0.04 because lastCapped was NOT reset by the blocked increase"
388+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.04) < tolerance
389+
}
390+
225391
def "not setting forced tracing via tag or setting it wrong value not causing exception"() {
226392
setup:
227393
def sampler = new RateByServiceTraceSampler()

0 commit comments

Comments
 (0)