Skip to content

Commit 74819ec

Browse files
committed
keep tracer header tags on additional endpoints
1 parent 1affec4 commit 74819ec

4 files changed

Lines changed: 101 additions & 32 deletions

File tree

bottlecap/src/traces/trace_aggregator.rs

Lines changed: 77 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use libdd_trace_utils::send_data::SendDataBuilder;
2+
use libdd_trace_utils::trace_utils::TracerHeaderTags;
23
use std::collections::VecDeque;
34

45
/// Maximum content size per payload uncompressed in bytes,
@@ -7,16 +8,71 @@ use std::collections::VecDeque;
78
/// <https://github.com/DataDog/datadog-agent/blob/9d57c10a9eeb3916e661d35dbd23c6e36395a99d/pkg/trace/writer/trace.go#L27-L31>
89
pub const MAX_CONTENT_SIZE_BYTES: usize = 3_200_000;
910

11+
/// Owned version of `TracerHeaderTags<'a>` so it can be stored across async
12+
/// boundaries without lifetime issues.
13+
pub struct OwnedTracerHeaderTags {
14+
pub lang: String,
15+
pub lang_version: String,
16+
pub lang_interpreter: String,
17+
pub lang_vendor: String,
18+
pub tracer_version: String,
19+
pub container_id: String,
20+
pub client_computed_top_level: bool,
21+
pub client_computed_stats: bool,
22+
pub dropped_p0_traces: usize,
23+
pub dropped_p0_spans: usize,
24+
}
25+
26+
impl From<TracerHeaderTags<'_>> for OwnedTracerHeaderTags {
27+
fn from(tags: TracerHeaderTags<'_>) -> Self {
28+
Self {
29+
lang: tags.lang.to_string(),
30+
lang_version: tags.lang_version.to_string(),
31+
lang_interpreter: tags.lang_interpreter.to_string(),
32+
lang_vendor: tags.lang_vendor.to_string(),
33+
tracer_version: tags.tracer_version.to_string(),
34+
container_id: tags.container_id.to_string(),
35+
client_computed_top_level: tags.client_computed_top_level,
36+
client_computed_stats: tags.client_computed_stats,
37+
dropped_p0_traces: tags.dropped_p0_traces,
38+
dropped_p0_spans: tags.dropped_p0_spans,
39+
}
40+
}
41+
}
42+
43+
impl OwnedTracerHeaderTags {
44+
#[must_use]
45+
pub fn to_tracer_header_tags(&self) -> TracerHeaderTags<'_> {
46+
TracerHeaderTags {
47+
lang: &self.lang,
48+
lang_version: &self.lang_version,
49+
lang_interpreter: &self.lang_interpreter,
50+
lang_vendor: &self.lang_vendor,
51+
tracer_version: &self.tracer_version,
52+
container_id: &self.container_id,
53+
client_computed_top_level: self.client_computed_top_level,
54+
client_computed_stats: self.client_computed_stats,
55+
dropped_p0_traces: self.dropped_p0_traces,
56+
dropped_p0_spans: self.dropped_p0_spans,
57+
}
58+
}
59+
}
60+
1061
// Bundle SendDataBuilder with payload size because SendDataBuilder doesn't
1162
// expose a getter for the size
1263
pub struct SendDataBuilderInfo {
1364
pub builder: SendDataBuilder,
1465
pub size: usize,
66+
pub header_tags: OwnedTracerHeaderTags,
1567
}
1668

1769
impl SendDataBuilderInfo {
18-
pub fn new(builder: SendDataBuilder, size: usize) -> Self {
19-
Self { builder, size }
70+
pub fn new(builder: SendDataBuilder, size: usize, header_tags: OwnedTracerHeaderTags) -> Self {
71+
Self {
72+
builder,
73+
size,
74+
header_tags,
75+
}
2076
}
2177
}
2278

@@ -25,7 +81,7 @@ impl SendDataBuilderInfo {
2581
pub struct TraceAggregator {
2682
queue: VecDeque<SendDataBuilderInfo>,
2783
max_content_size_bytes: usize,
28-
buffer: Vec<SendDataBuilder>,
84+
buffer: Vec<SendDataBuilderInfo>,
2985
}
3086

3187
impl Default for TraceAggregator {
@@ -55,7 +111,7 @@ impl TraceAggregator {
55111
}
56112

57113
/// Returns a batch of trace payloads, subject to the max content size.
58-
pub fn get_batch(&mut self) -> Vec<SendDataBuilder> {
114+
pub fn get_batch(&mut self) -> Vec<SendDataBuilderInfo> {
59115
let mut batch_size = 0;
60116

61117
// Fill the batch
@@ -70,7 +126,7 @@ impl TraceAggregator {
70126
break;
71127
}
72128
batch_size += payload_size;
73-
self.buffer.push(payload_info.builder);
129+
self.buffer.push(payload_info);
74130
} else {
75131
break;
76132
}
@@ -95,8 +151,8 @@ mod tests {
95151

96152
use super::*;
97153

98-
fn make_builder(size: usize) -> SendDataBuilder {
99-
let tracer_header_tags = TracerHeaderTags {
154+
fn make_header_tags() -> TracerHeaderTags<'static> {
155+
TracerHeaderTags {
100156
lang: "lang",
101157
lang_version: "lang_version",
102158
lang_interpreter: "lang_interpreter",
@@ -107,21 +163,26 @@ mod tests {
107163
client_computed_stats: true,
108164
dropped_p0_traces: 0,
109165
dropped_p0_spans: 0,
110-
};
111-
SendDataBuilder::new(
166+
}
167+
}
168+
169+
fn make_builder_info(size: usize) -> SendDataBuilderInfo {
170+
let tracer_header_tags = make_header_tags();
171+
let builder = SendDataBuilder::new(
112172
size,
113173
TracerPayloadCollection::V07(Vec::new()),
114-
tracer_header_tags,
174+
tracer_header_tags.clone(),
115175
&Endpoint::from_slice("localhost"),
116-
)
176+
);
177+
SendDataBuilderInfo::new(builder, size, OwnedTracerHeaderTags::from(tracer_header_tags))
117178
}
118179

119180
#[test]
120181
fn test_add() {
121182
let mut aggregator = TraceAggregator::default();
122183
let size = 1;
123184

124-
aggregator.add(SendDataBuilderInfo::new(make_builder(size), size));
185+
aggregator.add(make_builder_info(size));
125186
assert_eq!(aggregator.queue.len(), 1);
126187
}
127188

@@ -130,7 +191,7 @@ mod tests {
130191
let mut aggregator = TraceAggregator::default();
131192
let size = 1;
132193

133-
aggregator.add(SendDataBuilderInfo::new(make_builder(size), size));
194+
aggregator.add(make_builder_info(size));
134195
assert_eq!(aggregator.queue.len(), 1);
135196
let batch = aggregator.get_batch();
136197
assert_eq!(batch.len(), 1);
@@ -142,9 +203,9 @@ mod tests {
142203
let size = 1;
143204

144205
// Add 3 payloads
145-
aggregator.add(SendDataBuilderInfo::new(make_builder(size), size));
146-
aggregator.add(SendDataBuilderInfo::new(make_builder(size), size));
147-
aggregator.add(SendDataBuilderInfo::new(make_builder(size), size));
206+
aggregator.add(make_builder_info(size));
207+
aggregator.add(make_builder_info(size));
208+
aggregator.add(make_builder_info(size));
148209

149210
// The batch should only contain the first 2 payloads
150211
let first_batch = aggregator.get_batch();

bottlecap/src/traces/trace_aggregator_service.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use libdd_trace_utils::send_data::SendDataBuilder;
21
use tokio::sync::{mpsc, oneshot};
32
use tracing::{debug, error};
43

@@ -8,7 +7,7 @@ use crate::traces::trace_aggregator::{
87

98
pub enum AggregatorCommand {
109
InsertPayload(Box<SendDataBuilderInfo>),
11-
GetBatches(oneshot::Sender<Vec<Vec<SendDataBuilder>>>),
10+
GetBatches(oneshot::Sender<Vec<Vec<SendDataBuilderInfo>>>),
1211
Clear,
1312
Shutdown,
1413
}
@@ -27,7 +26,7 @@ impl AggregatorHandle {
2726
.send(AggregatorCommand::InsertPayload(Box::new(payload_info)))
2827
}
2928

30-
pub async fn get_batches(&self) -> Result<Vec<Vec<SendDataBuilder>>, String> {
29+
pub async fn get_batches(&self) -> Result<Vec<Vec<SendDataBuilderInfo>>, String> {
3130
let (response_tx, response_rx) = oneshot::channel();
3231
self.tx
3332
.send(AggregatorCommand::GetBatches(response_tx))
@@ -105,9 +104,11 @@ impl AggregatorService {
105104
#[allow(clippy::unwrap_used)]
106105
mod tests {
107106
use super::*;
107+
use crate::traces::trace_aggregator::OwnedTracerHeaderTags;
108108
use libdd_common::Endpoint;
109109
use libdd_trace_utils::{
110-
trace_utils::TracerHeaderTags, tracer_payload::TracerPayloadCollection,
110+
send_data::SendDataBuilder, trace_utils::TracerHeaderTags,
111+
tracer_payload::TracerPayloadCollection,
111112
};
112113

113114
#[tokio::test]
@@ -131,6 +132,7 @@ mod tests {
131132
dropped_p0_spans: 0,
132133
};
133134
let size = 1;
135+
let owned_tags = OwnedTracerHeaderTags::from(tracer_header_tags.clone());
134136
let payload = SendDataBuilder::new(
135137
size,
136138
TracerPayloadCollection::V07(Vec::new()),
@@ -139,7 +141,7 @@ mod tests {
139141
);
140142

141143
handle
142-
.insert_payload(SendDataBuilderInfo::new(payload, size))
144+
.insert_payload(SendDataBuilderInfo::new(payload, size, owned_tags))
143145
.unwrap();
144146

145147
let batches = handle.get_batches().await.unwrap();

bottlecap/src/traces/trace_flusher.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ use dogstatsd::api_key::ApiKeyFactory;
55
use libdd_common::Endpoint;
66
use libdd_trace_utils::{
77
config_utils::trace_intake_url_prefixed,
8-
send_data::{SendData, SendDataBuilder},
8+
send_data::SendData,
99
trace_utils::{self},
10-
tracer_header_tags::TracerHeaderTags,
1110
tracer_payload::TracerPayloadCollection,
1211
};
1312
use std::str::FromStr;
@@ -118,25 +117,29 @@ impl TraceFlusher {
118117
let mut batch_tasks = JoinSet::new();
119118

120119
for trace_builders in all_batches {
121-
let traces: Vec<_> = trace_builders
120+
let traces_with_tags: Vec<_> = trace_builders
122121
.into_iter()
123-
.map(|builder| builder.with_api_key(api_key.as_str()))
124-
.map(SendDataBuilder::build)
122+
.map(|info| {
123+
let trace = info.builder.with_api_key(api_key.as_str()).build();
124+
(trace, info.header_tags)
125+
})
125126
.collect();
126127

127128
// Send to ADDITIONAL endpoints for dual-shipping.
128129
// Construct separate SendData objects per endpoint by cloning the inner
129130
// V07 payload data (TracerPayload is Clone, but SendData is not).
130131
for endpoint in self.additional_endpoints.clone() {
131-
let additional_traces: Vec<_> = traces
132+
let additional_traces: Vec<_> = traces_with_tags
132133
.iter()
133-
.filter_map(|trace| match trace.get_payloads() {
134+
.filter_map(|(trace, tags)| match trace.get_payloads() {
134135
TracerPayloadCollection::V07(payloads) => Some(SendData::new(
135136
trace.len(),
136137
TracerPayloadCollection::V07(payloads.clone()),
137-
TracerHeaderTags::default(),
138+
tags.to_tracer_header_tags(),
138139
&endpoint,
139140
)),
141+
// All payloads in the extension are V07 (produced by
142+
// collect_pb_trace_chunks), so this branch is unreachable.
140143
_ => None,
141144
})
142145
.collect();
@@ -146,6 +149,7 @@ impl TraceFlusher {
146149
}
147150

148151
// Send to PRIMARY endpoint (moves traces into the task).
152+
let traces: Vec<_> = traces_with_tags.into_iter().map(|(t, _)| t).collect();
149153
let client_clone = http_client.clone();
150154
batch_tasks.spawn(async move { Self::send_traces(traces, client_clone).await });
151155
}

bottlecap/src/traces/trace_processor.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use tokio::sync::mpsc::Sender;
3131
use tokio::sync::mpsc::error::SendError;
3232
use tracing::{debug, error};
3333

34-
use super::stats_generator::StatsGenerator;
35-
use super::trace_aggregator::SendDataBuilderInfo;
34+
use crate::traces::stats_generator::StatsGenerator;
35+
use crate::traces::trace_aggregator::{OwnedTracerHeaderTags, SendDataBuilderInfo};
3636

3737
#[derive(Clone)]
3838
#[allow(clippy::module_name_repetitions)]
@@ -372,6 +372,8 @@ impl TraceProcessor for ServerlessTraceProcessor {
372372
}
373373
};
374374

375+
let owned_header_tags = OwnedTracerHeaderTags::from(header_tags.clone());
376+
375377
// Move original payload into builder (no clone needed)
376378
let builder = SendDataBuilder::new(body_size, payload, header_tags, &endpoint)
377379
.with_compression(Compression::Zstd(config.apm_config_compression_level))
@@ -383,7 +385,7 @@ impl TraceProcessor for ServerlessTraceProcessor {
383385
));
384386

385387
(
386-
SendDataBuilderInfo::new(builder, body_size),
388+
SendDataBuilderInfo::new(builder, body_size, owned_header_tags),
387389
payloads_for_stats,
388390
)
389391
}

0 commit comments

Comments
 (0)