Skip to content

Commit 9eb5294

Browse files
fix(logs): restore retry logic and update flush API signature
- Change LogFlusher::flush() to accept retry builders (Vec<RequestBuilder>) - Restore redrive loop in FlushingService for transient log flush failures - Update log_flush_handles type to return retry builders instead of bool - Suppress clippy::result_large_err in test modules across config and otlp
1 parent 4600dde commit 9eb5294

7 files changed

Lines changed: 30 additions & 12 deletions

File tree

bottlecap/src/config/env.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,7 @@ impl ConfigSource for EnvConfigSource {
711711
#[cfg_attr(coverage_nightly, coverage(off))] // Test modules skew coverage metrics
712712
#[cfg(test)]
713713
mod tests {
714+
#![allow(clippy::result_large_err)]
714715
use std::time::Duration;
715716

716717
use super::*;

bottlecap/src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,7 @@ pub fn deserialize_optional_duration_from_seconds_ignore_zero<'de, D: Deserializ
811811
#[cfg_attr(coverage_nightly, coverage(off))] // Test modules skew coverage metrics
812812
#[cfg(test)]
813813
pub mod tests {
814+
#![allow(clippy::result_large_err)]
814815
use libdd_trace_obfuscation::replacer::parse_rules_from_string;
815816

816817
use super::*;

bottlecap/src/config/yaml.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,7 @@ impl ConfigSource for YamlConfigSource {
742742
#[cfg_attr(coverage_nightly, coverage(off))] // Test modules skew coverage metrics
743743
#[cfg(test)]
744744
mod tests {
745+
#![allow(clippy::result_large_err)]
745746
use std::path::Path;
746747
use std::time::Duration;
747748

bottlecap/src/flushing/handles.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ pub struct MetricsRetryBatch {
2727
pub struct FlushHandles {
2828
/// Handles for trace flush operations. Returns failed traces for retry.
2929
pub trace_flush_handles: Vec<JoinHandle<Vec<SendData>>>,
30-
/// Handles for log flush operations. Returns true on success, false on any failure.
31-
pub log_flush_handles: Vec<JoinHandle<bool>>,
30+
/// Handles for log flush operations. Returns builders for transient failures to retry next invocation.
31+
pub log_flush_handles: Vec<JoinHandle<Vec<reqwest::RequestBuilder>>>,
3232
/// Handles for metrics flush operations. Returns batch info for retry.
3333
pub metric_flush_handles: Vec<JoinHandle<MetricsRetryBatch>>,
3434
/// Handles for proxy flush operations. Returns failed request builders for retry.

bottlecap/src/flushing/service.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl FlushingService {
7777
let lf = self.logs_flusher.clone();
7878
self.handles
7979
.log_flush_handles
80-
.push(tokio::spawn(async move { lf.flush().await }));
80+
.push(tokio::spawn(async move { lf.flush(vec![]).await }));
8181

8282
// Spawn traces flush
8383
let tf = self.trace_flusher.clone();
@@ -192,18 +192,32 @@ impl FlushingService {
192192
}
193193
}
194194

195-
// Await log handles — retries are handled internally by LogFlusher
195+
// Await log handles with retry
196196
for handle in self.handles.log_flush_handles.drain(..) {
197197
match handle.await {
198-
Ok(success) => {
199-
if !success {
200-
debug!("FLUSHING_SERVICE | log flush reported a failure");
201-
flush_error = true;
198+
Ok(retry) => {
199+
if !retry.is_empty() {
200+
debug!(
201+
"FLUSHING_SERVICE | redriving {:?} log payloads",
202+
retry.len()
203+
);
204+
}
205+
for item in retry {
206+
let lf = self.logs_flusher.clone();
207+
match item.try_clone() {
208+
Some(item_clone) => {
209+
joinset.spawn(async move {
210+
lf.flush(vec![item_clone]).await;
211+
});
212+
}
213+
None => {
214+
error!("FLUSHING_SERVICE | Can't clone redrive log payloads");
215+
}
216+
}
202217
}
203218
}
204219
Err(e) => {
205-
error!("FLUSHING_SERVICE | log flush task error {e:?}");
206-
flush_error = true;
220+
error!("FLUSHING_SERVICE | redrive log error {e:?}");
207221
}
208222
}
209223
}
@@ -312,7 +326,7 @@ impl FlushingService {
312326
.collect();
313327

314328
tokio::join!(
315-
self.logs_flusher.flush(),
329+
self.logs_flusher.flush(vec![]),
316330
futures::future::join_all(metrics_futures),
317331
self.trace_flusher.flush(None),
318332
self.stats_flusher.flush(force_stats, None),

bottlecap/src/otlp/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub fn should_enable_otlp_agent(config: &Arc<Config>) -> bool {
1616

1717
#[cfg(test)]
1818
mod tests {
19+
#![allow(clippy::result_large_err)]
1920
use super::*;
2021

2122
use std::path::Path;

bottlecap/tests/logs_integration_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async fn test_logs() {
103103

104104
logs_agent.sync_consume().await;
105105

106-
let _ = logs_flusher.flush().await;
106+
let _ = logs_flusher.flush(vec![]).await;
107107

108108
hello_mock.assert();
109109
}

0 commit comments

Comments
 (0)