Skip to content

Commit 441083f

Browse files
authored
feat: parallelize batch flusher (#707)
Parallelizes the trace flusher for cases where we send lots of spans. Tested on a function sending 100k spans
1 parent a950e9e commit 441083f

1 file changed

Lines changed: 22 additions & 9 deletions

File tree

bottlecap/src/traces/trace_flusher.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,20 +82,33 @@ impl TraceFlusher for ServerlessTraceFlusher {
8282
// Since we return the original traces on error, we need to clone them before coalescing
8383
let traces_clone = traces.clone();
8484

85-
for coalesced_traces in trace_utils::coalesce_send_data(traces) {
86-
match coalesced_traces
87-
.send_proxy(self.config.https_proxy.as_deref())
88-
.await
89-
.last_result
90-
{
91-
Ok(_) => debug!("Flushing traces took {}ms", start.elapsed().as_millis()),
85+
let coalesced_traces = trace_utils::coalesce_send_data(traces);
86+
let mut tasks = Vec::with_capacity(coalesced_traces.len());
87+
88+
for traces in coalesced_traces {
89+
let https_proxy = self.config.https_proxy.clone();
90+
tasks.push(tokio::spawn(async move {
91+
traces.send_proxy(https_proxy.as_deref()).await.last_result
92+
}));
93+
}
94+
95+
for task in tasks {
96+
match task.await {
97+
Ok(result) => {
98+
if let Err(e) = result {
99+
error!("Error sending trace: {e:?}");
100+
// Return the original traces for retry
101+
return Some(traces_clone);
102+
}
103+
}
92104
Err(e) => {
93-
error!("Error sending trace: {e:?}");
94-
// Return the original traces for retry
105+
error!("Task join error: {e:?}");
106+
// Return the original traces for retry if a task panics
95107
return Some(traces_clone);
96108
}
97109
}
98110
}
111+
debug!("Flushing traces took {}ms", start.elapsed().as_millis());
99112
None
100113
}
101114
}

0 commit comments

Comments
 (0)