Skip to content

Commit 8466769

Browse files
duncanistaclaude
andauthored
test(dogstatsd): add critical unit tests for aggregation, retry logic, and metric filtering (#80)
Add tests covering three previously untested critical paths: - Aggregation semantics: verify count metrics sum, gauge metrics take the last value, distribution sketches merge correctly, and mixed metric types stay in separate contexts. - should_try_next_batch() retry logic: cover all branching paths including 202 success, 4xx permanent errors, 5xx temporary errors, payload errors, destination errors with/without status codes. - aws.lambda.enhanced.invocations filter: verify the hardcoded deduplication filter correctly drops this metric while passing through other metrics. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e580b43 commit 8466769

File tree

5 files changed

+260
-0
lines changed

5 files changed

+260
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/dogstatsd/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ datadog-fips = { path = "../datadog-fips", default-features = false }
2929
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] }
3030

3131
[dev-dependencies]
32+
http = "1"
3233
mockito = { version = "1.5.0", default-features = false }
3334
proptest = "1.4.0"
3435
tracing-test = { version = "0.2.5", default-features = false }

crates/dogstatsd/src/aggregator.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,4 +757,108 @@ pub mod tests {
757757
assert_eq!(deserialized.sketches().len(), 10);
758758
assert_eq!(deserialized, distribution);
759759
}
760+
761+
#[test]
762+
#[allow(clippy::float_cmp)]
763+
fn count_aggregation_sums_values() {
764+
let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap();
765+
766+
let m1 = parse("hits:3|c|#env:prod|T1656581409").expect("parse failed");
767+
let m2 = parse("hits:7|c|#env:prod|T1656581409").expect("parse failed");
768+
let m3 = parse("hits:5|c|#env:prod|T1656581409").expect("parse failed");
769+
770+
aggregator.insert(m1).unwrap();
771+
aggregator.insert(m2).unwrap();
772+
aggregator.insert(m3).unwrap();
773+
774+
// Same name + tags + timestamp bucket = one context, values summed
775+
assert_eq!(aggregator.map.len(), 1);
776+
777+
let entry = aggregator
778+
.get_entry_by_id(
779+
"hits".into(),
780+
&Some(SortedTags::parse("env:prod").unwrap()),
781+
1656581400,
782+
)
783+
.unwrap();
784+
assert_eq!(entry.value.get_value().unwrap(), 15.0);
785+
}
786+
787+
#[test]
788+
#[allow(clippy::float_cmp)]
789+
fn gauge_aggregation_last_wins() {
790+
let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap();
791+
792+
let m1 = parse("cpu:30.0|g|#host:a|T1656581409").expect("parse failed");
793+
let m2 = parse("cpu:55.0|g|#host:a|T1656581409").expect("parse failed");
794+
let m3 = parse("cpu:42.0|g|#host:a|T1656581409").expect("parse failed");
795+
796+
aggregator.insert(m1).unwrap();
797+
aggregator.insert(m2).unwrap();
798+
aggregator.insert(m3).unwrap();
799+
800+
// Same context, gauge = last value wins
801+
assert_eq!(aggregator.map.len(), 1);
802+
803+
let entry = aggregator
804+
.get_entry_by_id(
805+
"cpu".into(),
806+
&Some(SortedTags::parse("host:a").unwrap()),
807+
1656581400,
808+
)
809+
.unwrap();
810+
assert_eq!(entry.value.get_value().unwrap(), 42.0);
811+
}
812+
813+
#[test]
814+
fn distribution_aggregation_merges_sketches() {
815+
let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap();
816+
817+
let m1 = parse("latency:10.0|d|#svc:web|T1656581409").expect("parse failed");
818+
let m2 = parse("latency:20.0|d|#svc:web|T1656581409").expect("parse failed");
819+
let m3 = parse("latency:30.0|d|#svc:web|T1656581409").expect("parse failed");
820+
821+
aggregator.insert(m1).unwrap();
822+
aggregator.insert(m2).unwrap();
823+
aggregator.insert(m3).unwrap();
824+
825+
// Same context, distributions merge into one sketch
826+
assert_eq!(aggregator.map.len(), 1);
827+
828+
let entry = aggregator
829+
.get_entry_by_id(
830+
"latency".into(),
831+
&Some(SortedTags::parse("svc:web").unwrap()),
832+
1656581400,
833+
)
834+
.unwrap();
835+
let sketch = entry.value.get_sketch().unwrap();
836+
assert!((sketch.min().unwrap() - 10.0).abs() < PRECISION);
837+
assert!((sketch.max().unwrap() - 30.0).abs() < PRECISION);
838+
assert!((sketch.sum().unwrap() - 60.0).abs() < PRECISION);
839+
assert_eq!(sketch.count(), 3);
840+
}
841+
842+
#[test]
843+
#[allow(clippy::float_cmp)]
844+
fn mixed_metric_types_stay_separate() {
845+
let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap();
846+
847+
// Same name but different types and tags keep different contexts
848+
let count = parse("req:1|c|#route:a|T1656581409").expect("parse failed");
849+
let gauge = parse("req:5|g|#route:b|T1656581409").expect("parse failed");
850+
let dist = parse("req:100|d|#route:c|T1656581409").expect("parse failed");
851+
852+
aggregator.insert(count).unwrap();
853+
aggregator.insert(gauge).unwrap();
854+
aggregator.insert(dist).unwrap();
855+
856+
assert_eq!(aggregator.map.len(), 3);
857+
858+
let series = aggregator.to_series();
859+
assert_eq!(series.len(), 2); // count + gauge
860+
861+
let protos = aggregator.distributions_to_protobuf();
862+
assert_eq!(protos.sketches().len(), 1); // distribution
863+
}
760864
}

crates/dogstatsd/src/dogstatsd.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,22 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
827827
.starts_with("custom.namespace.my.metric"));
828828
}
829829

830+
#[tokio::test]
831+
async fn test_dogstatsd_filter_lambda_enhanced_invocations() {
832+
let response = setup_and_consume_dogstatsd(
833+
"aws.lambda.enhanced.invocations:1|c\ncustom.metric:5|c\n",
834+
None,
835+
)
836+
.await;
837+
838+
// aws.lambda.enhanced.invocations should be filtered out
839+
assert_eq!(response.series.len(), 1);
840+
assert_eq!(response.series[0].series.len(), 1);
841+
assert!(response.series[0].series[0]
842+
.metric
843+
.starts_with("custom.metric"));
844+
}
845+
830846
#[tokio::test]
831847
async fn test_create_udp_socket_default_so_rcvbuf() {
832848
let socket = super::create_udp_socket("127.0.0.1:0", None).await.unwrap();

crates/dogstatsd/src/flusher.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,4 +401,142 @@ mod tests {
401401
// Should attempt to flush and return Some with failed metrics (since we're not mocking the API)
402402
assert!(result.is_some());
403403
}
404+
405+
// ---- should_try_next_batch tests ----
406+
407+
/// Helper to build a minimal reqwest::Response with a given status code.
408+
async fn mock_response(status: u16) -> Response {
409+
http::Response::builder()
410+
.status(status)
411+
.body("")
412+
.unwrap()
413+
.into()
414+
}
415+
416+
#[tokio::test]
417+
async fn test_should_try_next_batch_accepted() {
418+
let resp = Ok(mock_response(202).await);
419+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
420+
assert!(continue_shipping, "202 should continue to next batch");
421+
assert!(!should_retry, "202 should not retry");
422+
}
423+
424+
#[tokio::test]
425+
async fn test_should_try_next_batch_ok_non_accepted() {
426+
// 200 OK is unexpected for the intake API (it expects 202)
427+
let resp = Ok(mock_response(200).await);
428+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
429+
assert!(
430+
!continue_shipping,
431+
"unexpected 200 is not 4xx, should not continue"
432+
);
433+
assert!(
434+
should_retry,
435+
"unexpected 200 is not 4xx, should retry (treated as transient)"
436+
);
437+
}
438+
439+
#[tokio::test]
440+
async fn test_should_try_next_batch_400_permanent() {
441+
let resp = Ok(mock_response(400).await);
442+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
443+
assert!(
444+
continue_shipping,
445+
"4xx permanent error should continue to next batch"
446+
);
447+
assert!(!should_retry, "4xx permanent error should not retry");
448+
}
449+
450+
#[tokio::test]
451+
async fn test_should_try_next_batch_403_permanent() {
452+
let resp = Ok(mock_response(403).await);
453+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
454+
assert!(
455+
continue_shipping,
456+
"403 permanent error should continue to next batch"
457+
);
458+
assert!(!should_retry, "403 permanent error should not retry");
459+
}
460+
461+
#[tokio::test]
462+
async fn test_should_try_next_batch_500_temporary() {
463+
let resp = Ok(mock_response(500).await);
464+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
465+
assert!(
466+
!continue_shipping,
467+
"5xx temporary error should not continue"
468+
);
469+
assert!(should_retry, "5xx temporary error should retry");
470+
}
471+
472+
#[tokio::test]
473+
async fn test_should_try_next_batch_503_temporary() {
474+
let resp = Ok(mock_response(503).await);
475+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
476+
assert!(
477+
!continue_shipping,
478+
"503 temporary error should not continue"
479+
);
480+
assert!(should_retry, "503 temporary error should retry");
481+
}
482+
483+
#[tokio::test]
484+
async fn test_should_try_next_batch_payload_error() {
485+
let resp = Err(ShippingError::Payload("bad data".to_string()));
486+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
487+
assert!(
488+
continue_shipping,
489+
"payload error should continue to next batch (data is malformed)"
490+
);
491+
assert!(
492+
!should_retry,
493+
"payload error should not retry (data won't change)"
494+
);
495+
}
496+
497+
#[tokio::test]
498+
async fn test_should_try_next_batch_destination_error_temporary() {
499+
let resp = Err(ShippingError::Destination(
500+
Some(StatusCode::INTERNAL_SERVER_ERROR),
501+
"server down".to_string(),
502+
));
503+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
504+
assert!(
505+
!continue_shipping,
506+
"5xx destination error should not continue"
507+
);
508+
assert!(should_retry, "5xx destination error should retry");
509+
}
510+
511+
#[tokio::test]
512+
async fn test_should_try_next_batch_destination_error_permanent() {
513+
let resp = Err(ShippingError::Destination(
514+
Some(StatusCode::FORBIDDEN),
515+
"bad key".to_string(),
516+
));
517+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
518+
assert!(
519+
!continue_shipping,
520+
"4xx destination error should not continue"
521+
);
522+
assert!(!should_retry, "4xx destination error should not retry");
523+
}
524+
525+
#[tokio::test]
526+
async fn test_should_try_next_batch_destination_error_no_status() {
527+
// No status code (e.g., timeout / connection refused)
528+
let resp = Err(ShippingError::Destination(
529+
None,
530+
"connection refused".to_string(),
531+
));
532+
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
533+
assert!(
534+
!continue_shipping,
535+
"no-status destination error should not continue"
536+
);
537+
assert!(
538+
should_retry,
539+
"no-status destination error should retry (transient)"
540+
);
541+
}
404542
}

0 commit comments

Comments
 (0)