Skip to content

Commit ec644b4

Browse files
litianningdatadogclaudeshreyamalpani
authored
fix(logs): SLES-2843 bound retries on non-success HTTP status (#1220)
## Overview Fixes [SLES-2843](https://datadoghq.atlassian.net/browse/SLES-2843). `Flusher::send` (in `bottlecap/src/logs/flusher.rs`) only enforced `FLUSH_RETRY_COUNT` on transport errors. Non-success HTTP responses (4xx/5xx) on the `Ok` arm fell through and looped without bound, so a degraded log endpoint would be hit thousands of times per flush. A customer's Observability Pipelines Worker became overloaded during a 2× load test (April 27–30) and started returning HTTP errors. The unbounded retry loop produced ~10,000 attempts per failed flush against an OPW endpoint reached via cross-VPC `EKS → TGW → ELB`, contributing to ~$34K of unexpected AWS Transit Gateway / ELB egress charges over 3 days. Customer log: ``` DD_EXTENSION | ERROR | LOGS | Failed to send request after 2 ms and 10000 attempts: reqwest::Error { kind: Request, ..., source: hyper_util::client::legacy::Error(SendRequest, hyper::Error(Io, Kind(ConnectionReset))) } ``` ### Fix Mirror the `traces/proxy_flusher.rs:163` pattern: cap retries on `Ok(non-success)` too, returning a `FailedRequestError` so the next flush cycle can redrive the request — same recovery semantics as the existing `Err`-branch cap. Also captures and logs the response body for diagnostics (the previous code discarded it via `_ = resp.text().await`). The fix preserves all existing behavior: - 2xx → `Ok` immediately. - 403 → `Ok` early-exit (auth failures are permanent; don't retry). - 4xx / 5xx → retry up to `FLUSH_RETRY_COUNT` (3), then surface as `FailedRequestError` for the next cycle to redrive. - Transport errors (timeout/reset) → unchanged behavior, still capped at `FLUSH_RETRY_COUNT`. ## Testing Tested manually that this fixes the unbounded retry case. Before: ``` DD_EXTENSION | ERROR | LOGS | Failed to send request after 5005 ms and 132 attempts: reqwest::Error {..} ``` After: ``` DD_EXTENSION | ERROR | LOGS | Failed to send request after 1 ms and 3 attempts: status 500 Internal Server Error ``` Adds 7 unit tests in `bottlecap/src/logs/flusher.rs` covering every exit path of `Flusher::send`: | Test | Asserts | | --- | --- | | `send_returns_ok_on_success` | 200 → `Ok`, 1 hit | | `send_returns_ok_on_forbidden_without_retry` | 403 → `Ok`, 1 hit (no retries on permanent auth failures) | | `send_bounds_retries_on_non_success_status` | Persistent 500 → `Err` after 3 hits (the SLES-2843 regression) | | `send_bounds_retries_on_4xx_status` | Persistent 404 (e.g. misrouted OPW URL) → `Err` after 3 hits | | `send_bounds_retries_on_transport_error` | Persistent timeout → `Err` after 3 hits via the `Err` branch | | `send_succeeds_on_retry_after_transient_failure` | 500 once → 200 → `Ok` (verifies retries actually retry) | | `failed_request_error_carries_replayable_request` | After exhaustion, returned error downcasts to `FailedRequestError`, its `request` is cloneable, and the cloned request actually reaches the server with the original body — proves the redrive contract is intact | The eventual-success test uses a small axum-based stateful server because httpmock 0.7's matcher signature (`fn`, not `Fn`) cannot capture per-request state. - [x] `cargo test --lib logs::flusher::tests::` — 7/7 pass in 0.75s - [x] `cargo test --lib logs::` — 57/57 pass (no regressions) - [x] `cargo test --test logs_integration_test` — passes - [x] `cargo clippy --lib --tests --features default -- -D warnings` — clean - [x] `cargo fmt --check` — clean ## Follow-ups (not in this PR) These are improvements rather than blockers; tracked separately: 1. Add per-attempt backoff/jitter (currently the inner retry loop has no sleep between iterations). 2. Cycle-level circuit breaker — repeated flush cycles can still amplify load on a degraded endpoint via the redrive queue. 3. Fix misleading `"after X ms and Y attempts"` log message to report cumulative time, not just the last iteration. 🤖 Generated with [Claude Code](https://claude.com/claude-code) [SLES-2843]: https://datadoghq.atlassian.net/browse/SLES-2843?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: shreyamalpani <shreya.malpani@datadoghq.com>
1 parent 1cbae34 commit ec644b4

1 file changed

Lines changed: 297 additions & 9 deletions

File tree

bottlecap/src/logs/flusher.rs

Lines changed: 297 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,14 @@ impl Flusher {
6565
}
6666

6767
let mut failed_requests = Vec::new();
68+
// `JoinSet::join_all` returns `Vec<T>` directly (not `Vec<Result<T, JoinError>>`),
69+
// so each `result` is the `Result<(), Box<dyn Error + Send>>` returned by `send`.
70+
// A non-`FailedRequestError` here means the task completed but reported an error
71+
// it doesn't want redriven (e.g. could-not-clone); a `FailedRequestError` is a
72+
// bounded retry exhaustion and the request must be re-queued for the next cycle.
6873
for result in set.join_all().await {
69-
if let Err(e) = result {
70-
debug!("LOGS | Failed to join task: {}", e);
71-
continue;
72-
}
73-
74-
// At this point we know the task completed successfully,
75-
// but the send operation itself may have failed
7674
if let Err(e) = result {
7775
if let Some(failed_req_err) = e.downcast_ref::<FailedRequestError>() {
78-
// Clone the request from our custom error
7976
failed_requests.push(
8077
failed_req_err
8178
.request
@@ -120,7 +117,10 @@ impl Flusher {
120117
match resp {
121118
Ok(resp) => {
122119
let status = resp.status();
123-
_ = resp.text().await;
120+
// Drain the body to allow connection reuse, but do not capture
121+
// or log it: log intakes can echo back the submitted log payload,
122+
// which may contain sensitive customer data.
123+
let _ = resp.text().await;
124124
if status == StatusCode::FORBIDDEN {
125125
// Access denied. Stop retrying.
126126
error!(
@@ -131,6 +131,22 @@ impl Flusher {
131131
if status.is_success() {
132132
return Ok(());
133133
}
134+
if attempts >= FLUSH_RETRY_COUNT {
135+
// Non-success HTTP response (e.g. 4xx/5xx from an OPW or
136+
// intake under load) — surface the request for later retry
137+
// instead of looping unbounded against a degraded endpoint.
138+
error!(
139+
"LOGS | Failed to send request after {} ms and {} attempts: status {status}",
140+
elapsed.as_millis(),
141+
attempts,
142+
);
143+
return Err(Box::new(FailedRequestError {
144+
request: req,
145+
message: format!(
146+
"LOGS | Failed after {attempts} attempts: status {status}"
147+
),
148+
}));
149+
}
134150
}
135151
Err(e) => {
136152
if attempts >= FLUSH_RETRY_COUNT {
@@ -309,3 +325,275 @@ impl LogsFlusher {
309325
encoder.finish().map_err(|e| Box::new(e) as Box<dyn Error>)
310326
}
311327
}
328+
329+
#[cfg(test)]
330+
mod tests {
331+
use super::*;
332+
use httpmock::prelude::*;
333+
use std::sync::atomic::Ordering;
334+
use std::time::Duration;
335+
336+
fn build_request(server: &MockServer, timeout: Duration) -> reqwest::RequestBuilder {
337+
reqwest::Client::new()
338+
.post(server.url("/api/v2/logs"))
339+
.timeout(timeout)
340+
.body("test")
341+
}
342+
343+
#[tokio::test]
344+
async fn send_returns_ok_on_success() {
345+
let server = MockServer::start();
346+
let mock = server.mock(|when, then| {
347+
when.method(POST).path("/api/v2/logs");
348+
then.status(200);
349+
});
350+
351+
let result = Flusher::send(build_request(&server, Duration::from_secs(2))).await;
352+
353+
assert!(result.is_ok(), "2xx response should return Ok immediately");
354+
mock.assert_hits(1);
355+
}
356+
357+
#[tokio::test]
358+
async fn send_returns_ok_on_forbidden_without_retry() {
359+
let server = MockServer::start();
360+
let mock = server.mock(|when, then| {
361+
when.method(POST).path("/api/v2/logs");
362+
then.status(403);
363+
});
364+
365+
let result = Flusher::send(build_request(&server, Duration::from_secs(2))).await;
366+
367+
assert!(
368+
result.is_ok(),
369+
"403 is permanent (bad API key) — drop, do not retry"
370+
);
371+
mock.assert_hits(1);
372+
}
373+
374+
/// Regression test for SLES-2843: a persistent non-success, non-403 status
375+
/// (e.g. 500 from an OPW under load) must respect `FLUSH_RETRY_COUNT`
376+
/// instead of looping unbounded and hammering the endpoint.
377+
#[tokio::test]
378+
async fn send_bounds_retries_on_non_success_status() {
379+
let server = MockServer::start();
380+
let mock = server.mock(|when, then| {
381+
when.method(POST).path("/api/v2/logs");
382+
then.status(500);
383+
});
384+
385+
// Bound the test so the buggy (unbounded) implementation fails fast
386+
// instead of hanging the suite.
387+
let result = tokio::time::timeout(
388+
Duration::from_secs(3),
389+
Flusher::send(build_request(&server, Duration::from_secs(2))),
390+
)
391+
.await
392+
.expect("send must respect FLUSH_RETRY_COUNT and not loop forever on non-success");
393+
394+
assert!(
395+
result.is_err(),
396+
"send should return Err after exhausting retries on persistent 5xx"
397+
);
398+
mock.assert_hits(FLUSH_RETRY_COUNT);
399+
}
400+
401+
#[tokio::test]
402+
async fn send_bounds_retries_on_4xx_status() {
403+
let server = MockServer::start();
404+
let mock = server.mock(|when, then| {
405+
when.method(POST).path("/api/v2/logs");
406+
then.status(404);
407+
});
408+
409+
let result = tokio::time::timeout(
410+
Duration::from_secs(3),
411+
Flusher::send(build_request(&server, Duration::from_secs(2))),
412+
)
413+
.await
414+
.expect("send must respect FLUSH_RETRY_COUNT on 4xx (e.g. misrouted OPW URL)");
415+
416+
assert!(
417+
result.is_err(),
418+
"persistent 4xx should bound at FLUSH_RETRY_COUNT"
419+
);
420+
mock.assert_hits(FLUSH_RETRY_COUNT);
421+
}
422+
423+
#[tokio::test]
424+
async fn send_bounds_retries_on_transport_error() {
425+
let server = MockServer::start();
426+
// Server holds the response longer than the client timeout so every
427+
// attempt resolves to a reqwest timeout error (the `Err` branch).
428+
let mock = server.mock(|when, then| {
429+
when.method(POST).path("/api/v2/logs");
430+
then.status(200).delay(Duration::from_secs(5));
431+
});
432+
433+
let result = tokio::time::timeout(
434+
Duration::from_secs(3),
435+
Flusher::send(build_request(&server, Duration::from_millis(50))),
436+
)
437+
.await
438+
.expect("send must respect FLUSH_RETRY_COUNT on transport errors");
439+
440+
assert!(
441+
result.is_err(),
442+
"send should return Err after exhausting retries on transport timeout"
443+
);
444+
mock.assert_hits(FLUSH_RETRY_COUNT);
445+
}
446+
447+
/// Gap #2: when a transient failure clears, the loop must actually retry
448+
/// and succeed — not treat the first non-2xx as a permanent failure.
449+
/// Without this test, a refactor that early-returns on the first failed
450+
/// status would still pass the bounded-retry tests above.
451+
///
452+
/// Uses an axum-based stateful mock because httpmock 0.7 matchers cannot
453+
/// capture state (their predicate type is `fn`, not `Fn`).
454+
#[tokio::test]
455+
async fn send_succeeds_on_retry_after_transient_failure() {
456+
use axum::{Router, http::StatusCode, routing::post};
457+
use std::sync::atomic::AtomicUsize;
458+
459+
let counter = Arc::new(AtomicUsize::new(0));
460+
let app = Router::new().route(
461+
"/api/v2/logs",
462+
post({
463+
let counter = Arc::clone(&counter);
464+
move || {
465+
let counter = Arc::clone(&counter);
466+
async move {
467+
// First attempt → 500 (transient), all later attempts → 200.
468+
let n = counter.fetch_add(1, Ordering::SeqCst);
469+
if n == 0 {
470+
StatusCode::INTERNAL_SERVER_ERROR
471+
} else {
472+
StatusCode::OK
473+
}
474+
}
475+
}
476+
}),
477+
);
478+
479+
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
480+
.await
481+
.expect("test should be able to bind a local port");
482+
let addr = listener
483+
.local_addr()
484+
.expect("bound listener should have a local addr");
485+
tokio::spawn(async move {
486+
axum::serve(listener, app)
487+
.await
488+
.expect("test server should run cleanly");
489+
});
490+
491+
let req = reqwest::Client::new()
492+
.post(format!("http://{addr}/api/v2/logs"))
493+
.timeout(Duration::from_secs(2))
494+
.body("test");
495+
496+
let result = Flusher::send(req).await;
497+
498+
assert!(
499+
result.is_ok(),
500+
"send must retry past a transient failure and succeed on a later attempt"
501+
);
502+
assert_eq!(
503+
counter.load(Ordering::SeqCst),
504+
2,
505+
"expected exactly one failed attempt + one successful retry"
506+
);
507+
}
508+
509+
/// Gap #1: after retry exhaustion, the returned error must carry a
510+
/// re-issuable request so `Flusher::flush` can re-queue it for the next
511+
/// flush cycle. A refactor that loses or corrupts the request would
512+
/// silently turn a transient failure into permanent data loss.
513+
///
514+
/// The mock matches on the body too, so any corruption of the stashed
515+
/// request's payload would cause the re-issue to miss the mock and fail
516+
/// the status assertion below.
517+
#[tokio::test]
518+
async fn failed_request_error_carries_replayable_request() {
519+
let server = MockServer::start();
520+
let mock = server.mock(|when, then| {
521+
when.method(POST).path("/api/v2/logs").body_contains("test");
522+
then.status(500);
523+
});
524+
525+
let err = tokio::time::timeout(
526+
Duration::from_secs(3),
527+
Flusher::send(build_request(&server, Duration::from_secs(2))),
528+
)
529+
.await
530+
.expect("send must terminate")
531+
.expect_err("expected Err after retry exhaustion");
532+
533+
let failed = err
534+
.downcast_ref::<FailedRequestError>()
535+
.expect("error must be downcastable to FailedRequestError so flush() can re-queue it");
536+
537+
let cloned = failed
538+
.request
539+
.try_clone()
540+
.expect("FailedRequestError.request must be cloneable for redrive");
541+
542+
// Re-issue the stashed request and confirm it actually reaches the
543+
// server — proves the request is intact (URL, method, body), not a
544+
// corrupted shell. If the body were lost, the body_contains matcher
545+
// would miss and the response status would be 404 (httpmock default).
546+
let response = cloned
547+
.send()
548+
.await
549+
.expect("re-issued request should be sendable");
550+
assert_eq!(
551+
response.status().as_u16(),
552+
500,
553+
"re-issued request must hit the same mock — proves URL, method, and body are intact"
554+
);
555+
556+
// FLUSH_RETRY_COUNT attempts during send + 1 from the re-issue above.
557+
mock.assert_hits(FLUSH_RETRY_COUNT + 1);
558+
}
559+
560+
/// Codex P1 (PR #1220): verifies that `Flusher::flush` actually re-queues
561+
/// failed requests for the next flush cycle. The earlier dead-code path
562+
/// in `flush` swallowed every `FailedRequestError` so the bounded retries
563+
/// in `send` would silently drop data on persistent endpoint failures.
564+
#[tokio::test]
565+
async fn flush_redrives_failed_requests_after_retry_exhaustion() {
566+
use crate::config::Config;
567+
568+
let server = MockServer::start();
569+
let mock = server.mock(|when, then| {
570+
when.method(POST).path("/api/v2/logs");
571+
then.status(500);
572+
});
573+
574+
let api_key_factory = Arc::new(ApiKeyFactory::new("test-key"));
575+
let config = Arc::new(Config {
576+
logs_config_use_compression: false,
577+
..Config::default()
578+
});
579+
let flusher = Flusher::new(
580+
api_key_factory,
581+
server.url(""),
582+
config,
583+
reqwest::Client::new(),
584+
);
585+
586+
let batches = Some(Arc::new(vec![b"test-batch".to_vec()]));
587+
588+
let failed = tokio::time::timeout(Duration::from_secs(5), flusher.flush(batches))
589+
.await
590+
.expect("flush must terminate within bounded retries");
591+
592+
assert_eq!(
593+
failed.len(),
594+
1,
595+
"after retry exhaustion, the failed request must be returned for redrive on the next flush cycle"
596+
);
597+
mock.assert_hits(FLUSH_RETRY_COUNT);
598+
}
599+
}

0 commit comments

Comments
 (0)