Skip to content

Commit c95286e

Browse files
Add capped sampling rate increases
1 parent 1c44c6f commit c95286e

2 files changed

Lines changed: 172 additions & 2 deletions

File tree

dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.Map;
1111
import java.util.TreeMap;
1212
import java.util.function.Function;
13+
import java.util.function.LongSupplier;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
1516

@@ -24,8 +25,11 @@ public class RateByServiceTraceSampler implements Sampler, PrioritySampler, Remo
2425
public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr";
2526

2627
private static final double DEFAULT_RATE = 1.0;
28+
static final long RAMP_UP_INTERVAL_NANOS = 1_000_000_000L;
2729

2830
private volatile RateSamplersByEnvAndService serviceRates = new RateSamplersByEnvAndService();
31+
private long lastCappedNanos;
32+
LongSupplier nanoTimeSupplier = System::nanoTime;
2933

3034
@Override
3135
public <T extends CoreSpan<T>> boolean sample(final T span) {
@@ -62,6 +66,16 @@ private <T extends CoreSpan<T>> String getSpanEnv(final T span) {
6266
return span.getTag("env", "");
6367
}
6468

69+
static double cappedRate(double oldRate, double newRate, boolean canIncrease) {
70+
if (newRate <= oldRate || oldRate == 0) {
71+
return newRate;
72+
}
73+
if (!canIncrease) {
74+
return oldRate;
75+
}
76+
return Math.min(oldRate * 2, newRate);
77+
}
78+
6579
@Override
6680
public void onResponse(
6781
final String endpoint, final Map<String, Map<String, Number>> responseJson) {
@@ -72,6 +86,13 @@ public void onResponse(
7286
}
7387

7488
log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson);
89+
90+
final RateSamplersByEnvAndService currentSnapshot = serviceRates;
91+
final long now = nanoTimeSupplier.getAsLong();
92+
final boolean canIncrease =
93+
lastCappedNanos == 0 || (now - lastCappedNanos) >= RAMP_UP_INTERVAL_NANOS;
94+
boolean anyCapped = false;
95+
7596
final TreeMap<String, TreeMap<String, RateSampler>> updatedEnvServiceRates =
7697
new TreeMap<>(String::compareToIgnoreCase);
7798

@@ -84,17 +105,33 @@ public void onResponse(
84105

85106
EnvAndService envAndService = EnvAndService.fromString(entry.getKey());
86107
if (envAndService.isFallback()) {
87-
fallbackSampler = RateByServiceTraceSampler.createRateSampler(rate);
108+
double oldRate = currentSnapshot.getFallbackSampler().getSampleRate();
109+
double effective = cappedRate(oldRate, rate, canIncrease);
110+
if (effective != rate) {
111+
anyCapped = true;
112+
}
113+
fallbackSampler = RateByServiceTraceSampler.createRateSampler(effective);
88114
} else {
115+
double oldRate =
116+
currentSnapshot
117+
.getSampler(envAndService.lowerEnv, envAndService.lowerService)
118+
.getSampleRate();
119+
double effective = cappedRate(oldRate, rate, canIncrease);
120+
if (effective != rate) {
121+
anyCapped = true;
122+
}
89123
Map<String, RateSampler> serviceRates =
90124
updatedEnvServiceRates.computeIfAbsent(
91125
envAndService.lowerEnv, env -> new TreeMap<>(String::compareToIgnoreCase));
92126

93127
serviceRates.computeIfAbsent(
94128
envAndService.lowerService,
95-
service -> RateByServiceTraceSampler.createRateSampler(rate));
129+
service -> RateByServiceTraceSampler.createRateSampler(effective));
96130
}
97131
}
132+
if (anyCapped) {
133+
lastCappedNanos = now;
134+
}
98135
serviceRates = new RateSamplersByEnvAndService(updatedEnvServiceRates, fallbackSampler);
99136
}
100137

@@ -128,6 +165,10 @@ private static final class RateSamplersByEnvAndService {
128165
this.fallbackSampler = fallbackSampler;
129166
}
130167

168+
RateSampler getFallbackSampler() {
169+
return fallbackSampler;
170+
}
171+
131172
// used in tests only
132173
RateSampler getSampler(EnvAndService envAndService) {
133174
return getSampler(envAndService.lowerEnv, envAndService.lowerService);

dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,135 @@ class RateByServiceTraceSamplerTest extends DDCoreSpecification {
222222
'manual.keep' | true | PrioritySampling.USER_KEEP
223223
}
224224

225+
def "cappedRate returns new rate when decreasing"() {
226+
expect:
227+
RateByServiceTraceSampler.cappedRate(0.8, 0.4, true) == 0.4
228+
RateByServiceTraceSampler.cappedRate(0.8, 0.4, false) == 0.4
229+
RateByServiceTraceSampler.cappedRate(0.5, 0.5, true) == 0.5
230+
RateByServiceTraceSampler.cappedRate(0.5, 0.5, false) == 0.5
231+
}
232+
233+
def "cappedRate returns new rate when old rate is zero"() {
234+
expect:
235+
RateByServiceTraceSampler.cappedRate(0.0, 0.5, true) == 0.5
236+
RateByServiceTraceSampler.cappedRate(0.0, 0.5, false) == 0.5
237+
}
238+
239+
def "cappedRate caps increase to 2x when canIncrease"() {
240+
expect:
241+
RateByServiceTraceSampler.cappedRate(0.1, 1.0, true) == 0.2
242+
RateByServiceTraceSampler.cappedRate(0.2, 1.0, true) == 0.4
243+
RateByServiceTraceSampler.cappedRate(0.4, 1.0, true) == 0.8
244+
RateByServiceTraceSampler.cappedRate(0.4, 0.5, true) == 0.5
245+
}
246+
247+
def "cappedRate holds old rate when canIncrease is false"() {
248+
expect:
249+
RateByServiceTraceSampler.cappedRate(0.1, 1.0, false) == 0.1
250+
RateByServiceTraceSampler.cappedRate(0.2, 0.8, false) == 0.2
251+
}
252+
253+
def "ramp-up caps rate increases at 2x per interval"() {
254+
setup:
255+
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler()
256+
long currentTime = 1_000_000_000L
257+
serviceSampler.nanoTimeSupplier = { -> currentTime }
258+
def tolerance = 0.01
259+
260+
// Set initial rate to 0.1
261+
String response = '{"rate_by_service": {"service:foo,env:bar":0.1, "service:,env:":0.1}}'
262+
serviceSampler.onResponse("traces", serializer.fromJson(response))
263+
264+
expect:
265+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.1) < tolerance
266+
267+
when: "agent restart sends rate 1.0, first interval"
268+
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
269+
response = '{"rate_by_service": {"service:foo,env:bar":1.0, "service:,env:":1.0}}'
270+
serviceSampler.onResponse("traces", serializer.fromJson(response))
271+
272+
then: "rate is capped at 2x = 0.2"
273+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
274+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance
275+
276+
when: "second interval"
277+
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
278+
serviceSampler.onResponse("traces", serializer.fromJson(response))
279+
280+
then: "rate doubles to 0.4"
281+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance
282+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.4) < tolerance
283+
284+
when: "third interval"
285+
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
286+
serviceSampler.onResponse("traces", serializer.fromJson(response))
287+
288+
then: "rate doubles to 0.8"
289+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.8) < tolerance
290+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.8) < tolerance
291+
292+
when: "fourth interval"
293+
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
294+
serviceSampler.onResponse("traces", serializer.fromJson(response))
295+
296+
then: "rate reaches target 1.0 (2x=1.6 > 1.0)"
297+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 1.0) < tolerance
298+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 1.0) < tolerance
299+
}
300+
301+
def "ramp-down applies immediately"() {
302+
setup:
303+
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler()
304+
long currentTime = 1_000_000_000L
305+
serviceSampler.nanoTimeSupplier = { -> currentTime }
306+
def tolerance = 0.01
307+
308+
// Set initial rate to 0.8
309+
String response = '{"rate_by_service": {"service:foo,env:bar":0.8, "service:,env:":0.8}}'
310+
serviceSampler.onResponse("traces", serializer.fromJson(response))
311+
312+
when: "rate decreases to 0.2"
313+
response = '{"rate_by_service": {"service:foo,env:bar":0.2, "service:,env:":0.2}}'
314+
serviceSampler.onResponse("traces", serializer.fromJson(response))
315+
316+
then: "decrease is applied immediately"
317+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
318+
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance
319+
}
320+
321+
def "rate increase blocked during cooldown"() {
322+
setup:
323+
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler()
324+
long currentTime = 1_000_000_000L
325+
serviceSampler.nanoTimeSupplier = { -> currentTime }
326+
def tolerance = 0.01
327+
328+
// Set initial rate to 0.1
329+
String response = '{"rate_by_service": {"service:foo,env:bar":0.1}}'
330+
serviceSampler.onResponse("traces", serializer.fromJson(response))
331+
332+
when: "rate jumps, first capped increase"
333+
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
334+
response = '{"rate_by_service": {"service:foo,env:bar":1.0}}'
335+
serviceSampler.onResponse("traces", serializer.fromJson(response))
336+
337+
then: "capped to 0.2"
338+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
339+
340+
when: "try again immediately (within cooldown)"
341+
serviceSampler.onResponse("traces", serializer.fromJson(response))
342+
343+
then: "rate stays at 0.2 because cooldown hasn't elapsed"
344+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
345+
346+
when: "after cooldown elapsed"
347+
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
348+
serviceSampler.onResponse("traces", serializer.fromJson(response))
349+
350+
then: "rate doubles to 0.4"
351+
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance
352+
}
353+
225354
def "not setting forced tracing via tag or setting it wrong value not causing exception"() {
226355
setup:
227356
def sampler = new RateByServiceTraceSampler()

0 commit comments

Comments
 (0)