Skip to content

Commit 1b01d1f

Browse files
committed
Add 'crates/trace-mini-agent/' from commit '94f84ec7b574009bef6bf8d4aa1c474074a4367f'
git-subtree-dir: crates/trace-mini-agent git-subtree-mainline: 1be056e git-subtree-split: 94f84ec
2 parents 1be056e + 94f84ec commit 1b01d1f

11 files changed

Lines changed: 2132 additions & 0 deletions

File tree

crates/trace-mini-agent/Cargo.toml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
[package]
2+
name = "datadog-trace-mini-agent"
3+
description = "A subset of the trace agent that is shipped alongside tracers in a few serverless use cases (Google Cloud Functions, Azure Functions, and Azure Spring Apps)"
4+
edition.workspace = true
5+
version.workspace = true
6+
rust-version.workspace = true
7+
license.workspace = true
8+
autobenches = false
9+
10+
[lib]
11+
bench = false
12+
13+
[dependencies]
14+
anyhow = "1.0"
15+
hyper = { version = "1.6", features = ["http1", "client", "server"] }
16+
hyper-util = {version = "0.1", features = ["service"] }
17+
tower = { version = "0.5.2", features = ["util"] }
18+
http-body-util = "0.1"
19+
tokio = { version = "1", features = ["macros", "rt-multi-thread"]}
20+
async-trait = "0.1.64"
21+
tracing = { version = "0.1", default-features = false }
22+
serde = { version = "1.0.145", features = ["derive"] }
23+
serde_json = "1.0"
24+
ddcommon = { path = "../ddcommon" }
25+
datadog-trace-protobuf = { path = "../trace-protobuf" }
26+
datadog-trace-utils = { path = "../trace-utils", features = ["mini_agent"] }
27+
datadog-trace-normalization = { path = "../trace-normalization" }
28+
datadog-trace-obfuscation = { path = "../trace-obfuscation" }
29+
30+
[dev-dependencies]
31+
rmp-serde = "1.1.1"
32+
serial_test = "2.0.0"
33+
duplicate = "0.4.1"
34+
tempfile = "3.3.0"
35+
datadog-trace-utils = { path = "../trace-utils", features=["test-utils"] }
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use datadog_trace_utils::send_data::SendData;
5+
use std::collections::VecDeque;
6+
7+
/// Maximum content size per payload uncompressed in bytes,
8+
/// that the Datadog Trace API accepts. The value is 3.2 MB.
9+
///
10+
/// <https://github.com/DataDog/datadog-agent/blob/9d57c10a9eeb3916e661d35dbd23c6e36395a99d/pkg/trace/writer/trace.go#L27-L31>
11+
pub const MAX_CONTENT_SIZE_BYTES: usize = (32 * 1_024 * 1_024) / 10;
12+
13+
#[allow(clippy::module_name_repetitions)]
14+
pub struct TraceAggregator {
15+
queue: VecDeque<SendData>,
16+
max_content_size_bytes: usize,
17+
buffer: Vec<SendData>,
18+
}
19+
20+
impl Default for TraceAggregator {
21+
fn default() -> Self {
22+
TraceAggregator {
23+
queue: VecDeque::new(),
24+
max_content_size_bytes: MAX_CONTENT_SIZE_BYTES,
25+
buffer: Vec::new(),
26+
}
27+
}
28+
}
29+
30+
impl TraceAggregator {
31+
#[allow(dead_code)]
32+
#[allow(clippy::must_use_candidate)]
33+
pub fn new(max_content_size_bytes: usize) -> Self {
34+
TraceAggregator {
35+
queue: VecDeque::new(),
36+
max_content_size_bytes,
37+
buffer: Vec::new(),
38+
}
39+
}
40+
41+
pub fn add(&mut self, p: SendData) {
42+
self.queue.push_back(p);
43+
}
44+
45+
pub fn get_batch(&mut self) -> Vec<SendData> {
46+
let mut batch_size = 0;
47+
48+
// Fill the batch
49+
while batch_size < self.max_content_size_bytes {
50+
if let Some(payload) = self.queue.pop_front() {
51+
let payload_size = payload.len();
52+
53+
// Put stats back in the queue
54+
if batch_size + payload_size > self.max_content_size_bytes {
55+
self.queue.push_front(payload);
56+
break;
57+
}
58+
batch_size += payload_size;
59+
self.buffer.push(payload);
60+
} else {
61+
break;
62+
}
63+
}
64+
65+
std::mem::take(&mut self.buffer)
66+
}
67+
}
68+
69+
#[cfg(test)]
70+
#[allow(clippy::unwrap_used)]
71+
mod tests {
72+
use datadog_trace_utils::{
73+
trace_utils::TracerHeaderTags, tracer_payload::TracerPayloadCollection,
74+
};
75+
use ddcommon::Endpoint;
76+
77+
use super::*;
78+
79+
#[test]
80+
fn test_add() {
81+
let mut aggregator = TraceAggregator::default();
82+
let tracer_header_tags = TracerHeaderTags {
83+
lang: "lang",
84+
lang_version: "lang_version",
85+
lang_interpreter: "lang_interpreter",
86+
lang_vendor: "lang_vendor",
87+
tracer_version: "tracer_version",
88+
container_id: "container_id",
89+
client_computed_top_level: true,
90+
client_computed_stats: true,
91+
dropped_p0_traces: 0,
92+
dropped_p0_spans: 0,
93+
};
94+
let payload = SendData::new(
95+
1,
96+
TracerPayloadCollection::V07(Vec::new()),
97+
tracer_header_tags,
98+
&Endpoint::from_slice("localhost"),
99+
);
100+
101+
aggregator.add(payload.clone());
102+
assert_eq!(aggregator.queue.len(), 1);
103+
assert_eq!(aggregator.queue[0].is_empty(), payload.is_empty());
104+
}
105+
106+
#[test]
107+
fn test_get_batch() {
108+
let mut aggregator = TraceAggregator::default();
109+
let tracer_header_tags = TracerHeaderTags {
110+
lang: "lang",
111+
lang_version: "lang_version",
112+
lang_interpreter: "lang_interpreter",
113+
lang_vendor: "lang_vendor",
114+
tracer_version: "tracer_version",
115+
container_id: "container_id",
116+
client_computed_top_level: true,
117+
client_computed_stats: true,
118+
dropped_p0_traces: 0,
119+
dropped_p0_spans: 0,
120+
};
121+
let payload = SendData::new(
122+
1,
123+
TracerPayloadCollection::V07(Vec::new()),
124+
tracer_header_tags,
125+
&Endpoint::from_slice("localhost"),
126+
);
127+
128+
aggregator.add(payload.clone());
129+
assert_eq!(aggregator.queue.len(), 1);
130+
let batch = aggregator.get_batch();
131+
assert_eq!(batch.len(), 1);
132+
}
133+
134+
#[test]
135+
fn test_get_batch_full_entries() {
136+
let mut aggregator = TraceAggregator::new(2);
137+
let tracer_header_tags = TracerHeaderTags {
138+
lang: "lang",
139+
lang_version: "lang_version",
140+
lang_interpreter: "lang_interpreter",
141+
lang_vendor: "lang_vendor",
142+
tracer_version: "tracer_version",
143+
container_id: "container_id",
144+
client_computed_top_level: true,
145+
client_computed_stats: true,
146+
dropped_p0_traces: 0,
147+
dropped_p0_spans: 0,
148+
};
149+
let payload = SendData::new(
150+
1,
151+
TracerPayloadCollection::V07(Vec::new()),
152+
tracer_header_tags,
153+
&Endpoint::from_slice("localhost"),
154+
);
155+
156+
// Add 3 payloads
157+
aggregator.add(payload.clone());
158+
aggregator.add(payload.clone());
159+
aggregator.add(payload.clone());
160+
161+
// The batch should only contain the first 2 payloads
162+
let first_batch = aggregator.get_batch();
163+
assert_eq!(first_batch.len(), 2);
164+
assert_eq!(aggregator.queue.len(), 1);
165+
166+
// The second batch should only contain the last log
167+
let second_batch = aggregator.get_batch();
168+
assert_eq!(second_batch.len(), 1);
169+
assert_eq!(aggregator.queue.len(), 0);
170+
}
171+
}

0 commit comments

Comments
 (0)