Skip to content

Commit 13c8b7b

Browse files
authored
feat: [Trace Stats] Move stats generation after trace obfuscation (#855)
## This PR 1. Move stats generation after trace obfuscation, which is the correct order as suggested by Trace Agent team. Right now stats generation is before trace obfuscation. 2. Also generate trace stats for OTLP agent. Right now we only do it for trace agent. ## Architecture Copied from #842 <img width="1296" height="674" alt="image" src="https://github.com/user-attachments/assets/2d4cb925-6cfc-4581-8ed6-6bd87cf0d87a" /> ## Testing Tested in the next PR #856, which implements stats concentrator. Trace stats appeared in Datadog. <img width="538" height="317" alt="image" src="https://github.com/user-attachments/assets/48b849cc-2413-41d5-8576-5ff657c21a0f" /> ## Next steps 1. Implement `StatsConcentrator` 2. Rename for clarity: - `SendingTraceStatsProcessor` -> `TraceStatsGenerator` - `stats_sender` -> `stats_generator` 3. Small refactor: consider passing around `stats_sender` instead of `stats_concentrator_handle`. Right now `SendingTraceStatsProcessor::new()` is called in three places. It might be possible to call it only once then pass it around. ## Notes Jira: https://datadoghq.atlassian.net/browse/SVLS-7593
1 parent d7acebe commit 13c8b7b

5 files changed

Lines changed: 138 additions & 80 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,13 @@ use bottlecap::{
5757
proxy_aggregator,
5858
proxy_flusher::Flusher as ProxyFlusher,
5959
stats_aggregator::StatsAggregator,
60-
stats_concentrator_service::StatsConcentratorService,
60+
stats_concentrator_service::{StatsConcentratorHandle, StatsConcentratorService},
6161
stats_flusher::{self, StatsFlusher},
6262
stats_processor, trace_agent,
6363
trace_aggregator::{self, SendDataBuilderInfo},
6464
trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher},
6565
trace_processor::{self, SendingTraceProcessor},
66+
trace_stats_processor::SendingTraceStatsProcessor,
6667
},
6768
};
6869
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
@@ -433,6 +434,7 @@ async fn extension_loop_active(
433434
stats_flusher,
434435
proxy_flusher,
435436
trace_agent_shutdown_token,
437+
stats_concentrator,
436438
) = start_trace_agent(
437439
config,
438440
&api_key_factory,
@@ -477,6 +479,7 @@ async fn extension_loop_active(
477479
tags_provider.clone(),
478480
trace_processor.clone(),
479481
trace_agent_channel.clone(),
482+
stats_concentrator.clone(),
480483
);
481484

482485
let mut flush_control =
@@ -508,7 +511,7 @@ async fn extension_loop_active(
508511
tokio::select! {
509512
biased;
510513
Some(event) = event_bus.rx.recv() => {
511-
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await {
514+
if let Some(telemetry_event) = handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await {
512515
if let TelemetryRecord::PlatformRuntimeDone{ .. } = telemetry_event.record {
513516
break 'flush_end;
514517
}
@@ -635,7 +638,7 @@ async fn extension_loop_active(
635638
break 'next_invocation;
636639
}
637640
Some(event) = event_bus.rx.recv() => {
638-
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
641+
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await;
639642
}
640643
_ = race_flush_interval.tick() => {
641644
if flush_control.flush_strategy == FlushStrategy::Default {
@@ -677,7 +680,7 @@ async fn extension_loop_active(
677680
debug!("Received tombstone event, proceeding with shutdown");
678681
break 'shutdown;
679682
}
680-
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone()).await;
683+
handle_event_bus_event(event, invocation_processor.clone(), appsec_processor.clone(), tags_provider.clone(), trace_processor.clone(), trace_agent_channel.clone(), stats_concentrator.clone()).await;
681684
}
682685
// Add timeout to prevent hanging indefinitely
683686
() = tokio::time::sleep(tokio::time::Duration::from_millis(300)) => {
@@ -758,6 +761,7 @@ async fn handle_event_bus_event(
758761
tags_provider: Arc<TagProvider>,
759762
trace_processor: Arc<trace_processor::ServerlessTraceProcessor>,
760763
trace_agent_channel: Sender<SendDataBuilderInfo>,
764+
stats_concentrator: StatsConcentratorHandle,
761765
) -> Option<TelemetryEvent> {
762766
match event {
763767
Event::OutOfMemory(event_timestamp) => {
@@ -809,6 +813,9 @@ async fn handle_event_bus_event(
809813
appsec: appsec_processor.clone(),
810814
processor: trace_processor.clone(),
811815
trace_tx: trace_agent_channel.clone(),
816+
stats_sender: Arc::new(SendingTraceStatsProcessor::new(
817+
stats_concentrator.clone(),
818+
)),
812819
}),
813820
event.time.timestamp(),
814821
)
@@ -993,6 +1000,7 @@ fn start_trace_agent(
9931000
Arc<stats_flusher::ServerlessStatsFlusher>,
9941001
Arc<ProxyFlusher>,
9951002
tokio_util::sync::CancellationToken,
1003+
StatsConcentratorHandle,
9961004
) {
9971005
// Stats
9981006
let (stats_concentrator_service, stats_concentrator_handle) =
@@ -1048,7 +1056,7 @@ fn start_trace_agent(
10481056
invocation_processor,
10491057
appsec_processor,
10501058
Arc::clone(tags_provider),
1051-
stats_concentrator_handle,
1059+
stats_concentrator_handle.clone(),
10521060
);
10531061
let trace_agent_channel = trace_agent.get_sender_copy();
10541062
let shutdown_token = trace_agent.shutdown_token();
@@ -1067,6 +1075,7 @@ fn start_trace_agent(
10671075
stats_flusher,
10681076
proxy_flusher,
10691077
shutdown_token,
1078+
stats_concentrator_handle,
10701079
)
10711080
}
10721081

@@ -1151,12 +1160,19 @@ fn start_otlp_agent(
11511160
tags_provider: Arc<TagProvider>,
11521161
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
11531162
trace_tx: Sender<SendDataBuilderInfo>,
1163+
stats_concentrator: StatsConcentratorHandle,
11541164
) -> Option<CancellationToken> {
11551165
if !should_enable_otlp_agent(config) {
11561166
return None;
11571167
}
1158-
1159-
let agent = OtlpAgent::new(config.clone(), tags_provider, trace_processor, trace_tx);
1168+
let stats_sender = Arc::new(SendingTraceStatsProcessor::new(stats_concentrator));
1169+
let agent = OtlpAgent::new(
1170+
config.clone(),
1171+
tags_provider,
1172+
trace_processor,
1173+
trace_tx,
1174+
stats_sender,
1175+
);
11601176
let cancel_token = agent.cancel_token();
11611177
if let Err(e) = agent.start() {
11621178
error!("Error starting OTLP agent: {e:?}");

bottlecap/src/otlp/agent.rs

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ use crate::{
1818
http::{extract_request_body, handler_not_found},
1919
otlp::processor::Processor as OtlpProcessor,
2020
tags::provider,
21-
traces::{trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor},
21+
traces::{
22+
trace_aggregator::SendDataBuilderInfo, trace_processor::TraceProcessor,
23+
trace_stats_processor::SendingTraceStatsProcessor,
24+
},
2225
};
2326

2427
const OTLP_AGENT_HTTP_PORT: u16 = 4318;
@@ -29,6 +32,7 @@ type AgentState = (
2932
OtlpProcessor,
3033
Arc<dyn TraceProcessor + Send + Sync>,
3134
Sender<SendDataBuilderInfo>,
35+
Arc<SendingTraceStatsProcessor>,
3236
);
3337

3438
pub struct Agent {
@@ -37,6 +41,7 @@ pub struct Agent {
3741
processor: OtlpProcessor,
3842
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
3943
trace_tx: Sender<SendDataBuilderInfo>,
44+
stats_sender: Arc<SendingTraceStatsProcessor>,
4045
port: u16,
4146
cancel_token: CancellationToken,
4247
}
@@ -47,6 +52,7 @@ impl Agent {
4752
tags_provider: Arc<provider::Provider>,
4853
trace_processor: Arc<dyn TraceProcessor + Send + Sync>,
4954
trace_tx: Sender<SendDataBuilderInfo>,
55+
stats_sender: Arc<SendingTraceStatsProcessor>,
5056
) -> Self {
5157
let port = Self::parse_port(
5258
config.otlp_config_receiver_protocols_http_endpoint.as_ref(),
@@ -60,6 +66,7 @@ impl Agent {
6066
processor: OtlpProcessor::new(Arc::clone(&config)),
6167
trace_processor,
6268
trace_tx,
69+
stats_sender,
6370
port,
6471
cancel_token,
6572
}
@@ -112,6 +119,7 @@ impl Agent {
112119
self.processor.clone(),
113120
Arc::clone(&self.trace_processor),
114121
self.trace_tx.clone(),
122+
Arc::clone(&self.stats_sender),
115123
);
116124

117125
Router::new()
@@ -126,7 +134,9 @@ impl Agent {
126134
}
127135

128136
async fn v1_traces(
129-
State((config, tags_provider, processor, trace_processor, trace_tx)): State<AgentState>,
137+
State((config, tags_provider, processor, trace_processor, trace_tx, stats_sender)): State<
138+
AgentState,
139+
>,
130140
request: Request,
131141
) -> Response {
132142
let (parts, body) = match extract_request_body(request).await {
@@ -163,34 +173,44 @@ impl Agent {
163173
.into_response();
164174
}
165175

166-
let send_data_builder = trace_processor
167-
.process_traces(
168-
config,
169-
tags_provider,
170-
tracer_header_tags,
171-
traces,
172-
body_size,
173-
None,
174-
)
175-
.await;
176+
let compute_trace_stats = config.compute_trace_stats;
177+
let (send_data_builder, processed_traces) = trace_processor.process_traces(
178+
config,
179+
tags_provider,
180+
tracer_header_tags,
181+
traces,
182+
body_size,
183+
None,
184+
);
176185

177186
match trace_tx.send(send_data_builder).await {
178187
Ok(()) => {
179188
debug!("OTLP | Successfully buffered traces to be aggregated.");
180-
(
181-
StatusCode::OK,
182-
json!({"rate_by_service":{"service:,env:":1}}).to_string(),
183-
)
184-
.into_response()
185189
}
186190
Err(err) => {
187191
error!("OTLP | Error sending traces to the trace aggregator: {err}");
188-
(
192+
return (
189193
StatusCode::INTERNAL_SERVER_ERROR,
190194
json!({ "message": format!("Error sending traces to the trace aggregator: {err}") }).to_string()
191-
).into_response()
195+
).into_response();
196+
}
197+
};
198+
199+
// This needs to be after process_traces() because process_traces()
200+
// performs obfuscation, and we need to compute stats on the obfuscated traces.
201+
if compute_trace_stats {
202+
if let Err(err) = stats_sender.send(&processed_traces) {
203+
// Just log the error. We don't think trace stats are critical, so we don't want to
204+
// return an error if only stats fail to send.
205+
error!("OTLP | Error sending traces to the stats concentrator: {err}");
192206
}
193207
}
208+
209+
(
210+
StatusCode::OK,
211+
json!({"rate_by_service":{"service:,env:":1}}).to_string(),
212+
)
213+
.into_response()
194214
}
195215
}
196216

bottlecap/src/traces/trace_agent.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ const LAMBDA_LOAD_SPAN: &str = "aws.lambda.load";
7878
pub struct TraceState {
7979
pub config: Arc<config::Config>,
8080
pub trace_sender: Arc<trace_processor::SendingTraceProcessor>,
81-
pub stats_sender: Arc<SendingTraceStatsProcessor>,
8281
pub invocation_processor: Arc<Mutex<InvocationProcessor>>,
8382
pub tags_provider: Arc<provider::Provider>,
8483
}
@@ -199,16 +198,17 @@ impl TraceAgent {
199198
}
200199

201200
fn make_router(&self, stats_tx: Sender<pb::ClientStatsPayload>) -> Router {
201+
let stats_sender = Arc::new(SendingTraceStatsProcessor::new(
202+
self.stats_concentrator.clone(),
203+
));
202204
let trace_state = TraceState {
203205
config: Arc::clone(&self.config),
204206
trace_sender: Arc::new(SendingTraceProcessor {
205207
appsec: self.appsec_processor.clone(),
206208
processor: Arc::clone(&self.trace_processor),
207209
trace_tx: self.tx.clone(),
210+
stats_sender,
208211
}),
209-
stats_sender: Arc::new(SendingTraceStatsProcessor::new(
210-
self.stats_concentrator.clone(),
211-
)),
212212
invocation_processor: Arc::clone(&self.invocation_processor),
213213
tags_provider: Arc::clone(&self.tags_provider),
214214
};
@@ -277,7 +277,6 @@ impl TraceAgent {
277277
state.config,
278278
request,
279279
state.trace_sender,
280-
state.stats_sender,
281280
state.invocation_processor,
282281
state.tags_provider,
283282
ApiVersion::V04,
@@ -290,7 +289,6 @@ impl TraceAgent {
290289
state.config,
291290
request,
292291
state.trace_sender,
293-
state.stats_sender,
294292
state.invocation_processor,
295293
state.tags_provider,
296294
ApiVersion::V05,
@@ -430,7 +428,6 @@ impl TraceAgent {
430428
config: Arc<config::Config>,
431429
request: Request,
432430
trace_sender: Arc<SendingTraceProcessor>,
433-
stats_sender: Arc<SendingTraceStatsProcessor>,
434431
invocation_processor: Arc<Mutex<InvocationProcessor>>,
435432
tags_provider: Arc<provider::Provider>,
436433
version: ApiVersion,
@@ -523,15 +520,6 @@ impl TraceAgent {
523520
}
524521
}
525522

526-
if config.compute_trace_stats {
527-
if let Err(err) = stats_sender.send(&traces) {
528-
return error_response(
529-
StatusCode::INTERNAL_SERVER_ERROR,
530-
format!("Error sending stats to the stats aggregator: {err}"),
531-
);
532-
}
533-
}
534-
535523
if let Err(err) = trace_sender
536524
.send_processed_traces(
537525
config,
@@ -545,7 +533,7 @@ impl TraceAgent {
545533
{
546534
return error_response(
547535
StatusCode::INTERNAL_SERVER_ERROR,
548-
format!("Error sending traces to the trace aggregator: {err}"),
536+
format!("Error sending traces to the trace aggregator: {err:?}"),
549537
);
550538
}
551539

0 commit comments

Comments
 (0)