Skip to content

Commit 866193c

Browse files
fix
1 parent c95286e commit 866193c

2 files changed

Lines changed: 71 additions & 31 deletions

File tree

dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class RateByServiceTraceSampler implements Sampler, PrioritySampler, Remo
2525
public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr";
2626

2727
private static final double DEFAULT_RATE = 1.0;
28+
private static final double MAX_RATE_INCREASE_FACTOR = 2.0;
2829
static final long RAMP_UP_INTERVAL_NANOS = 1_000_000_000L;
2930

3031
private volatile RateSamplersByEnvAndService serviceRates = new RateSamplersByEnvAndService();
@@ -66,14 +67,12 @@ private <T extends CoreSpan<T>> String getSpanEnv(final T span) {
6667
return span.getTag("env", "");
6768
}
6869

69-
static double cappedRate(double oldRate, double newRate, boolean canIncrease) {
70-
if (newRate <= oldRate || oldRate == 0) {
71-
return newRate;
72-
}
73-
if (!canIncrease) {
74-
return oldRate;
75-
}
76-
return Math.min(oldRate * 2, newRate);
70+
static boolean shouldCap(double oldRate, double newRate) {
71+
return oldRate != 0 && newRate > oldRate * MAX_RATE_INCREASE_FACTOR;
72+
}
73+
74+
static double cappedRate(double oldRate) {
75+
return oldRate * MAX_RATE_INCREASE_FACTOR;
7776
}
7877

7978
@Override
@@ -106,30 +105,35 @@ public void onResponse(
106105
EnvAndService envAndService = EnvAndService.fromString(entry.getKey());
107106
if (envAndService.isFallback()) {
108107
double oldRate = currentSnapshot.getFallbackSampler().getSampleRate();
109-
double effective = cappedRate(oldRate, rate, canIncrease);
110-
if (effective != rate) {
108+
if (canIncrease && shouldCap(oldRate, rate)) {
109+
rate = cappedRate(oldRate);
111110
anyCapped = true;
111+
} else if (!canIncrease && shouldCap(oldRate, rate)) {
112+
rate = oldRate;
112113
}
113-
fallbackSampler = RateByServiceTraceSampler.createRateSampler(effective);
114+
fallbackSampler = RateByServiceTraceSampler.createRateSampler(rate);
114115
} else {
115116
double oldRate =
116117
currentSnapshot
117118
.getSampler(envAndService.lowerEnv, envAndService.lowerService)
118119
.getSampleRate();
119-
double effective = cappedRate(oldRate, rate, canIncrease);
120-
if (effective != rate) {
120+
if (canIncrease && shouldCap(oldRate, rate)) {
121+
rate = cappedRate(oldRate);
121122
anyCapped = true;
123+
} else if (!canIncrease && shouldCap(oldRate, rate)) {
124+
rate = oldRate;
122125
}
126+
final double effectiveRate = rate;
123127
Map<String, RateSampler> serviceRates =
124128
updatedEnvServiceRates.computeIfAbsent(
125129
envAndService.lowerEnv, env -> new TreeMap<>(String::compareToIgnoreCase));
126130

127131
serviceRates.computeIfAbsent(
128132
envAndService.lowerService,
129-
service -> RateByServiceTraceSampler.createRateSampler(effective));
133+
service -> RateByServiceTraceSampler.createRateSampler(effectiveRate));
130134
}
131135
}
132-
if (anyCapped) {
136+
if (canIncrease && anyCapped) {
133137
lastCappedNanos = now;
134138
}
135139
serviceRates = new RateSamplersByEnvAndService(updatedEnvServiceRates, fallbackSampler);

dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -222,32 +222,31 @@ class RateByServiceTraceSamplerTest extends DDCoreSpecification {
222222
'manual.keep' | true | PrioritySampling.USER_KEEP
223223
}
224224

225-
def "cappedRate returns new rate when decreasing"() {
225+
def "shouldCap returns false when rate decreases or stays same"() {
226226
expect:
227-
RateByServiceTraceSampler.cappedRate(0.8, 0.4, true) == 0.4
228-
RateByServiceTraceSampler.cappedRate(0.8, 0.4, false) == 0.4
229-
RateByServiceTraceSampler.cappedRate(0.5, 0.5, true) == 0.5
230-
RateByServiceTraceSampler.cappedRate(0.5, 0.5, false) == 0.5
227+
!RateByServiceTraceSampler.shouldCap(0.8, 0.4)
228+
!RateByServiceTraceSampler.shouldCap(0.5, 0.5)
229+
!RateByServiceTraceSampler.shouldCap(0.5, 1.0) // 1.0 <= 0.5 * 2, no cap needed
231230
}
232231

233-
def "cappedRate returns new rate when old rate is zero"() {
232+
def "shouldCap returns false when old rate is zero"() {
234233
expect:
235-
RateByServiceTraceSampler.cappedRate(0.0, 0.5, true) == 0.5
236-
RateByServiceTraceSampler.cappedRate(0.0, 0.5, false) == 0.5
234+
!RateByServiceTraceSampler.shouldCap(0.0, 0.5)
235+
!RateByServiceTraceSampler.shouldCap(0.0, 1.0)
237236
}
238237

239-
def "cappedRate caps increase to 2x when canIncrease"() {
238+
def "shouldCap returns true when new rate exceeds 2x old rate"() {
240239
expect:
241-
RateByServiceTraceSampler.cappedRate(0.1, 1.0, true) == 0.2
242-
RateByServiceTraceSampler.cappedRate(0.2, 1.0, true) == 0.4
243-
RateByServiceTraceSampler.cappedRate(0.4, 1.0, true) == 0.8
244-
RateByServiceTraceSampler.cappedRate(0.4, 0.5, true) == 0.5
240+
RateByServiceTraceSampler.shouldCap(0.1, 1.0)
241+
RateByServiceTraceSampler.shouldCap(0.2, 0.8)
242+
RateByServiceTraceSampler.shouldCap(0.1, 0.3)
245243
}
246244

247-
def "cappedRate holds old rate when canIncrease is false"() {
245+
def "cappedRate returns 2x old rate"() {
248246
expect:
249-
RateByServiceTraceSampler.cappedRate(0.1, 1.0, false) == 0.1
250-
RateByServiceTraceSampler.cappedRate(0.2, 0.8, false) == 0.2
247+
RateByServiceTraceSampler.cappedRate(0.1) == 0.2
248+
RateByServiceTraceSampler.cappedRate(0.2) == 0.4
249+
RateByServiceTraceSampler.cappedRate(0.4) == 0.8
251250
}
252251

253252
def "ramp-up caps rate increases at 2x per interval"() {
@@ -351,6 +350,43 @@ class RateByServiceTraceSamplerTest extends DDCoreSpecification {
351350
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance
352351
}
353352

353+
def "cooldown not reset by blocked increase"() {
354+
setup:
355+
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler()
356+
long currentTime = 1_000_000_000L
357+
serviceSampler.nanoTimeSupplier = { -> currentTime }
358+
def tolerance = 0.01
359+
360+
// Set initial low rate
361+
String response = '{"rate_by_service": {"service:foo,env:bar":0.01}}'
362+
serviceSampler.onResponse("traces", serializer.fromJson(response))
363+
364+
expect:
365+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.01) < tolerance
366+
367+
when: "wait for cooldown, apply increase: 0.01 -> 0.02"
368+
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
369+
response = '{"rate_by_service": {"service:foo,env:bar":1.0}}'
370+
serviceSampler.onResponse("traces", serializer.fromJson(response))
371+
372+
then: "rate is capped at 2x = 0.02"
373+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance
374+
375+
when: "before cooldown elapses, send another increase - rate should be held and lastCapped NOT reset"
376+
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2
377+
serviceSampler.onResponse("traces", serializer.fromJson(response))
378+
379+
then: "rate stays at 0.02 (cooldown)"
380+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance
381+
382+
when: "wait remaining half of cooldown from the original cap - should allow next ramp-up"
383+
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2
384+
serviceSampler.onResponse("traces", serializer.fromJson(response))
385+
386+
then: "rate doubles to 0.04 because lastCapped was NOT reset by the blocked increase"
387+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.04) < tolerance
388+
}
389+
354390
def "not setting forced tracing via tag or setting it wrong value not causing exception"() {
355391
setup:
356392
def sampler = new RateByServiceTraceSampler()

0 commit comments

Comments
 (0)