Skip to content

Commit dc90ba4

Browse files
committed
fix(sampling): fix agent rate key name and rate limiter fractional token loss
1 parent d275180 commit dc90ba4

3 files changed

Lines changed: 61 additions & 5 deletions

File tree

datadog-opentelemetry/src/sampling/agent_service_sampler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use super::rate_sampler::RateSampler;
1111
#[derive(Debug, serde::Deserialize)]
1212
pub(crate) struct AgentRates<'a> {
1313
#[serde(borrow)]
14-
pub rates_by_service: Option<HashMap<&'a str, f64>>,
14+
pub rate_by_service: Option<HashMap<&'a str, f64>>,
1515
}
1616

1717
#[derive(Debug, Default, Clone)]

datadog-opentelemetry/src/sampling/datadog_sampler.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl DatadogSampler {
5757
let Ok(new_rates) = serde_json::de::from_str::<AgentRates>(s) else {
5858
return;
5959
};
60-
let Some(new_rates) = new_rates.rates_by_service else {
60+
let Some(new_rates) = new_rates.rate_by_service else {
6161
return;
6262
};
6363
service_samplers.update_rates(new_rates.into_iter().map(|(k, v)| (k.to_string(), v)));
@@ -778,6 +778,34 @@ mod tests {
778778
);
779779
}
780780

781+
#[test]
782+
fn test_on_agent_response_deserializes_rate_by_service() {
783+
let sampler = DatadogSampler::new(vec![], 100);
784+
let handler = sampler.on_agent_response();
785+
786+
handler(
787+
r#"{ "rate_by_service": { "service:test,env:staging": 1.0, "service:test,env:prod": 0.3 } }"#,
788+
);
789+
790+
assert_eq!(sampler.service_samplers.len(), 2);
791+
assert_eq!(
792+
sampler
793+
.service_samplers
794+
.get("service:test,env:staging")
795+
.unwrap()
796+
.sample_rate(),
797+
1.0
798+
);
799+
assert_eq!(
800+
sampler
801+
.service_samplers
802+
.get("service:test,env:prod")
803+
.unwrap()
804+
.sample_rate(),
805+
0.3
806+
);
807+
}
808+
781809
#[test]
782810
fn test_update_service_rates() {
783811
let sampler = DatadogSampler::new(vec![], 100);

datadog-opentelemetry/src/sampling/rate_limiter.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,12 @@ impl RateLimiter {
172172
if state.tokens > state.max_tokens {
173173
state.tokens = state.max_tokens;
174174
}
175+
// Only advance last_update by the time consumed by whole tokens, preserving
176+
// fractional progress toward the next token.
177+
let consumed_ns = (tokens_to_add as f64 / self.rate_limit as f64
178+
* state.time_window_ns as f64) as u64;
179+
state.last_update += std::time::Duration::from_nanos(consumed_ns);
175180
}
176-
// Always update last_update, even if no tokens were added (e.g., very short elapsed time
177-
// yielding tokens_to_add = 0)
178-
state.last_update = timestamp;
179181
}
180182

181183
/// Calculate the current window rate
@@ -243,6 +245,32 @@ mod tests {
243245
assert_eq!(limiter.effective_rate(), 0.0);
244246
}
245247

248+
#[test]
249+
fn test_rate_limiter_accumulates_fractional_tokens() {
250+
// With rate=2/s each token takes 500ms. Sleeping 300ms twice (600ms total) must
251+
// yield at least one token. Before the fix, each sub-token call reset last_update
252+
// to the call time, so the second 300ms window also computed only 0.6 tokens and
253+
// the limiter starved indefinitely. Margins: the first assert!(!..) has 200ms of
254+
// headroom below 500ms; the final assert!(..) has 100ms of headroom above 500ms.
255+
let limiter = RateLimiter::new(2, None);
256+
257+
// Drain all initial tokens.
258+
for _ in 0..2 {
259+
assert!(limiter.is_allowed());
260+
}
261+
assert!(!limiter.is_allowed());
262+
263+
// First sleep: 300ms → 0.6 tokens, not enough to allow.
264+
thread::sleep(Duration::from_millis(300));
265+
assert!(!limiter.is_allowed());
266+
267+
// Second sleep: another 300ms. Total elapsed since drain ≈ 600ms → 1.2 tokens.
268+
// The fix preserves fractional progress so this succeeds; the old code reset
269+
// last_update on the first call and only saw another 0.6 tokens here.
270+
thread::sleep(Duration::from_millis(300));
271+
assert!(limiter.is_allowed());
272+
}
273+
246274
#[test]
247275
fn test_rate_limiter_limit_rate() {
248276
let limiter = RateLimiter::new(5, None); // 5 per second

0 commit comments

Comments
 (0)