Skip to content

Commit 4fb2cfe

Browse files
fix(logs): redrive failed requests + drop response body from error log
Address PR #1220 review comments. 1. (Codex P1) `Flusher::flush` was dropping `FailedRequestError` instead of redriving. The previous double-`if let Err(e) = result` pattern only ever ran the first arm — `JoinSet::join_all` returns `Vec<T>`, not `Vec<Result<T, JoinError>>`, so `result` is the send Result directly. The first arm logged "Failed to join task" and `continue`d, making the second arm unreachable. Net effect: every bounded retry exhaustion silently dropped the batch instead of re-queuing it. Collapse to a single, correct arm. 2. (Copilot) Stop logging the response body on retry exhaustion. Log intakes can echo back the submitted payload, which may contain sensitive customer data, and bodies can be arbitrarily large. Keep the status code in the error message; drain the body to allow connection reuse but discard it. 3. (Copilot) Add body matcher to `failed_request_error_carries_replayable_request` so the test would fail if the stashed request's body were corrupted — previously only path/method were verified. Adds `flush_redrives_failed_requests_after_retry_exhaustion` to lock in the redrive contract at the `Flusher::flush` entry point. Without the fix above, this test fails with `failed.len() == 0`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a4d1e00 commit 4fb2cfe

1 file changed

Lines changed: 63 additions & 13 deletions

File tree

bottlecap/src/logs/flusher.rs

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,14 @@ impl Flusher {
6565
}
6666

6767
let mut failed_requests = Vec::new();
68+
// `JoinSet::join_all` returns `Vec<T>` directly (not `Vec<Result<T, JoinError>>`),
69+
// so each `result` is the `Result<(), Box<dyn Error + Send>>` returned by `send`.
70+
// A non-`FailedRequestError` here means the task completed but reported an error
71+
// it doesn't want redriven (e.g. could-not-clone); a `FailedRequestError` is a
72+
// bounded retry exhaustion and the request must be re-queued for the next cycle.
6873
for result in set.join_all().await {
69-
if let Err(e) = result {
70-
debug!("LOGS | Failed to join task: {}", e);
71-
continue;
72-
}
73-
74-
// At this point we know the task completed successfully,
75-
// but the send operation itself may have failed
7674
if let Err(e) = result {
7775
if let Some(failed_req_err) = e.downcast_ref::<FailedRequestError>() {
78-
// Clone the request from our custom error
7976
failed_requests.push(
8077
failed_req_err
8178
.request
@@ -120,7 +117,10 @@ impl Flusher {
120117
match resp {
121118
Ok(resp) => {
122119
let status = resp.status();
123-
let body = resp.text().await.unwrap_or_default();
120+
// Drain the body to allow connection reuse, but do not capture
121+
// or log it: log intakes can echo back the submitted log payload,
122+
// which may contain sensitive customer data.
123+
let _ = resp.text().await;
124124
if status == StatusCode::FORBIDDEN {
125125
// Access denied. Stop retrying.
126126
error!(
@@ -136,7 +136,7 @@ impl Flusher {
136136
// intake under load) — surface the request for later retry
137137
// instead of looping unbounded against a degraded endpoint.
138138
error!(
139-
"LOGS | Failed to send request after {} ms and {} attempts: status {status}, body: {body}",
139+
"LOGS | Failed to send request after {} ms and {} attempts: status {status}",
140140
elapsed.as_millis(),
141141
attempts,
142142
);
@@ -510,11 +510,15 @@ mod tests {
510510
/// re-issuable request so `Flusher::flush` can re-queue it for the next
511511
/// flush cycle. A refactor that loses or corrupts the request would
512512
/// silently turn a transient failure into permanent data loss.
513+
///
514+
/// The mock matches on the body too, so any corruption of the stashed
515+
/// request's payload would cause the re-issue to miss the mock and fail
516+
/// the status assertion below.
513517
#[tokio::test]
514518
async fn failed_request_error_carries_replayable_request() {
515519
let server = MockServer::start();
516520
let mock = server.mock(|when, then| {
517-
when.method(POST).path("/api/v2/logs");
521+
when.method(POST).path("/api/v2/logs").body_contains("test");
518522
then.status(500);
519523
});
520524

@@ -536,14 +540,60 @@ mod tests {
536540
.expect("FailedRequestError.request must be cloneable for redrive");
537541

538542
// Re-issue the stashed request and confirm it actually reaches the
539-
// server — proves the request is intact, not a corrupted shell.
543+
// server — proves the request is intact (URL, method, body), not a
544+
// corrupted shell. If the body were lost, the body_contains matcher
545+
// would miss and the response status would be 404 (httpmock default).
540546
let response = cloned
541547
.send()
542548
.await
543549
.expect("re-issued request should be sendable");
544-
assert_eq!(response.status().as_u16(), 500);
550+
assert_eq!(
551+
response.status().as_u16(),
552+
500,
553+
"re-issued request must hit the same mock — proves URL, method, and body are intact"
554+
);
545555

546556
// FLUSH_RETRY_COUNT attempts during send + 1 from the re-issue above.
547557
mock.assert_hits(FLUSH_RETRY_COUNT + 1);
548558
}
559+
560+
/// Codex P1 (PR #1220): verifies that `Flusher::flush` actually re-queues
561+
/// failed requests for the next flush cycle. The earlier dead-code path
562+
/// in `flush` swallowed every `FailedRequestError` so the bounded retries
563+
/// in `send` would silently drop data on persistent endpoint failures.
564+
#[tokio::test]
565+
async fn flush_redrives_failed_requests_after_retry_exhaustion() {
566+
use crate::config::Config;
567+
568+
let server = MockServer::start();
569+
let mock = server.mock(|when, then| {
570+
when.method(POST).path("/api/v2/logs");
571+
then.status(500);
572+
});
573+
574+
let api_key_factory = Arc::new(ApiKeyFactory::new("test-key"));
575+
let config = Arc::new(Config {
576+
logs_config_use_compression: false,
577+
..Config::default()
578+
});
579+
let flusher = Flusher::new(
580+
api_key_factory,
581+
server.url(""),
582+
config,
583+
reqwest::Client::new(),
584+
);
585+
586+
let batches = Some(Arc::new(vec![b"test-batch".to_vec()]));
587+
588+
let failed = tokio::time::timeout(Duration::from_secs(5), flusher.flush(batches))
589+
.await
590+
.expect("flush must terminate within bounded retries");
591+
592+
assert_eq!(
593+
failed.len(),
594+
1,
595+
"after retry exhaustion, the failed request must be returned for redrive on the next flush cycle"
596+
);
597+
mock.assert_hits(FLUSH_RETRY_COUNT);
598+
}
549599
}

0 commit comments

Comments
 (0)