Skip to content

Commit b8b40d2

Browse files
committed
add stats retry
1 parent 12bcb5f commit b8b40d2

3 files changed

Lines changed: 89 additions & 31 deletions

File tree

bottlecap/src/flushing/handles.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use datadog_protos::metrics::SketchPayload;
44
use dogstatsd::datadog::Series;
5+
use libdd_trace_protobuf::pb;
56
use libdd_trace_utils::send_data::SendData;
67
use tokio::task::JoinHandle;
78

@@ -32,8 +33,8 @@ pub struct FlushHandles {
3233
pub metric_flush_handles: Vec<JoinHandle<MetricsRetryBatch>>,
3334
/// Handles for proxy flush operations. Returns failed request builders for retry.
3435
pub proxy_flush_handles: Vec<JoinHandle<Vec<reqwest::RequestBuilder>>>,
35-
/// Handles for stats flush operations. Stats don't support retry.
36-
pub stats_flush_handles: Vec<JoinHandle<()>>,
36+
/// Handles for stats flush operations. Returns failed stats payloads for retry.
37+
pub stats_flush_handles: Vec<JoinHandle<Vec<pb::ClientStatsPayload>>>,
3738
}
3839

3940
impl FlushHandles {

bottlecap/src/flushing/service.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,13 @@ impl FlushingService {
122122
self.handles.metric_flush_handles.push(handle);
123123
}
124124

125-
// Spawn stats flush (fire-and-forget, no retry)
125+
// Spawn stats flush
126126
let sf = Arc::clone(&self.stats_flusher);
127127
self.handles
128128
.stats_flush_handles
129-
.push(tokio::spawn(async move { sf.flush(false).await }));
129+
.push(tokio::spawn(async move {
130+
sf.flush(false, None).await.unwrap_or_default()
131+
}));
130132

131133
// Spawn proxy flush
132134
let pf = self.proxy_flusher.clone();
@@ -153,11 +155,25 @@ impl FlushingService {
153155
let mut joinset = tokio::task::JoinSet::new();
154156
let mut flush_error = false;
155157

156-
// Await stats handles (no retry)
158+
// Await stats handles with retry
157159
for handle in self.handles.stats_flush_handles.drain(..) {
158-
if let Err(e) = handle.await {
159-
error!("FLUSHING_SERVICE | stats flush error {e:?}");
160-
flush_error = true;
160+
match handle.await {
161+
Ok(retry) => {
162+
let sf = self.stats_flusher.clone();
163+
if !retry.is_empty() {
164+
debug!(
165+
"FLUSHING_SERVICE | redriving {:?} stats payloads",
166+
retry.len()
167+
);
168+
joinset.spawn(async move {
169+
sf.flush(false, Some(retry)).await;
170+
});
171+
}
172+
}
173+
Err(e) => {
174+
error!("FLUSHING_SERVICE | stats flush error {e:?}");
175+
flush_error = true;
176+
}
161177
}
162178
}
163179

@@ -312,7 +328,7 @@ impl FlushingService {
312328
self.logs_flusher.flush(None),
313329
futures::future::join_all(metrics_futures),
314330
self.trace_flusher.flush(None),
315-
self.stats_flusher.flush(force_stats),
331+
self.stats_flusher.flush(force_stats, None),
316332
self.proxy_flusher.flush(None),
317333
);
318334
}

bottlecap/src/traces/stats_flusher.rs

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ pub struct StatsFlusher {
2222
api_key_factory: Arc<ApiKeyFactory>,
2323
endpoint: OnceCell<Endpoint>,
2424
/// Cached HTTP client, lazily initialized on first use.
25-
/// TODO: StatsFlusher and TraceFlusher both hit trace.agent.datadoghq.{site} and could
26-
/// share a single HTTP client for better connection pooling. Consider using a
27-
/// SharedHyperClient wrapper passed to both flushers from main.rs.
25+
/// TODO: `StatsFlusher` and `TraceFlusher` both hit trace.agent.datadoghq.{site} and could
26+
/// share a single HTTP client for better connection pooling.
2827
http_client: OnceCell<HyperClient>,
2928
}
3029

@@ -45,14 +44,20 @@ impl StatsFlusher {
4544
}
4645

4746
/// Flushes stats to the Datadog trace stats intake.
48-
pub async fn send(&self, stats: Vec<pb::ClientStatsPayload>) {
47+
///
48+
/// Returns `None` on success, or `Some(failed_stats)` if the flush failed and should be retried.
49+
pub async fn send(
50+
&self,
51+
stats: Vec<pb::ClientStatsPayload>,
52+
) -> Option<Vec<pb::ClientStatsPayload>> {
4953
if stats.is_empty() {
50-
return;
54+
return None;
5155
}
5256

5357
let Some(api_key) = self.api_key_factory.get_api_key().await else {
54-
error!("Skipping flushing stats: Failed to resolve API key");
55-
return;
58+
error!("STATS | Skipping flushing stats: Failed to resolve API key");
59+
// No API key means we can't send - don't retry as it won't help
60+
return None;
5661
};
5762

5863
let api_key_clone = api_key.to_string();
@@ -72,17 +77,18 @@ impl StatsFlusher {
7277
})
7378
.await;
7479

75-
debug!("Flushing {} stats", stats.len());
80+
debug!("STATS | Flushing {} stats", stats.len());
7681

77-
let stats_payload = stats_utils::construct_stats_payload(stats);
82+
let stats_payload = stats_utils::construct_stats_payload(stats.clone());
7883

79-
debug!("Stats payload to be sent: {stats_payload:?}");
84+
debug!("STATS | Stats payload to be sent: {stats_payload:?}");
8085

8186
let serialized_stats_payload = match stats_utils::serialize_stats_payload(stats_payload) {
8287
Ok(res) => res,
8388
Err(err) => {
84-
error!("Failed to serialize stats payload, dropping stats: {err}");
85-
return;
89+
// Serialization errors are permanent - data is malformed, don't retry
90+
error!("STATS | Failed to serialize stats payload, dropping stats: {err}");
91+
return None;
8692
}
8793
};
8894

@@ -93,8 +99,8 @@ impl StatsFlusher {
9399
// Get or create the cached HTTP client
94100
let http_client = self.get_or_init_http_client().await;
95101
let Some(http_client) = http_client else {
96-
error!("STATS_FLUSHER | Failed to create HTTP client");
97-
return;
102+
error!("STATS | Failed to create HTTP client, will retry");
103+
return Some(stats);
98104
};
99105

100106
let resp = stats_utils::send_stats_payload_with_client(
@@ -106,27 +112,62 @@ impl StatsFlusher {
106112
.await;
107113
let elapsed = start.elapsed();
108114
debug!(
109-
"Stats request to {} took {} ms",
115+
"STATS | Stats request to {} took {} ms",
110116
stats_url,
111117
elapsed.as_millis()
112118
);
113119
match resp {
114-
Ok(()) => debug!("Successfully flushed stats"),
120+
Ok(()) => {
121+
debug!("STATS | Successfully flushed stats");
122+
None
123+
}
115124
Err(e) => {
116-
error!("Error sending stats: {e:?}");
125+
// Network/server errors are temporary - return stats for retry
126+
error!("STATS | Error sending stats: {e:?}");
127+
Some(stats)
117128
}
118-
};
129+
}
119130
}
120131

121-
pub async fn flush(&self, force_flush: bool) {
122-
let mut guard = self.aggregator.lock().await;
132+
/// Flushes stats from the aggregator.
133+
///
134+
/// Returns `None` on success, or `Some(failed_stats)` if any flush failed and should be retried.
135+
/// If `failed_stats` is provided, it will attempt to send those first before fetching new stats.
136+
pub async fn flush(
137+
&self,
138+
force_flush: bool,
139+
failed_stats: Option<Vec<pb::ClientStatsPayload>>,
140+
) -> Option<Vec<pb::ClientStatsPayload>> {
141+
let mut all_failed: Vec<pb::ClientStatsPayload> = Vec::new();
142+
143+
// First, retry any previously failed stats
144+
if let Some(retry_stats) = failed_stats {
145+
if !retry_stats.is_empty() {
146+
debug!(
147+
"STATS | Retrying {} previously failed stats",
148+
retry_stats.len()
149+
);
150+
if let Some(still_failed) = self.send(retry_stats).await {
151+
all_failed.extend(still_failed);
152+
}
153+
}
154+
}
123155

156+
// Then flush new stats from the aggregator
157+
let mut guard = self.aggregator.lock().await;
124158
let mut stats = guard.get_batch(force_flush).await;
125159
while !stats.is_empty() {
126-
self.send(stats).await;
127-
160+
if let Some(failed) = self.send(stats).await {
161+
all_failed.extend(failed);
162+
}
128163
stats = guard.get_batch(force_flush).await;
129164
}
165+
166+
if all_failed.is_empty() {
167+
None
168+
} else {
169+
Some(all_failed)
170+
}
130171
}
131172
/// Returns a reference to the cached HTTP client, initializing it if necessary.
132173
///

0 commit comments

Comments
 (0)