Skip to content

Commit 30248ca

Browse files
authored
S3 Downstream Span Pointers (#503)
1 parent b1a9bc1 commit 30248ca

7 files changed

Lines changed: 353 additions & 3 deletions

File tree

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ impl Processor {
361361
header_tags,
362362
vec![traces],
363363
body_size,
364+
self.inferrer.span_pointers.clone(),
364365
);
365366

366367
if let Err(e) = trace_agent_tx.send(send_data).await {

bottlecap/src/lifecycle/invocation/span_inferrer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::lifecycle::invocation::{
2222
Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_ARN_TAG,
2323
},
2424
};
25+
use crate::traces::span_pointers::SpanPointer;
2526
use crate::traces::{context::SpanContext, propagation::Propagator};
2627

2728
#[derive(Default)]
@@ -39,6 +40,8 @@ pub struct SpanInferrer {
3940
generated_span_context: Option<SpanContext>,
4041
// Tags generated from the trigger
4142
trigger_tags: Option<HashMap<String, String>>,
43+
// Span pointers from S3 or DynamoDB streams
44+
pub span_pointers: Option<Vec<SpanPointer>>,
4245
}
4346

4447
impl SpanInferrer {
@@ -52,6 +55,7 @@ impl SpanInferrer {
5255
carrier: None,
5356
generated_span_context: None,
5457
trigger_tags: None,
58+
span_pointers: None,
5559
}
5660
}
5761

@@ -178,6 +182,7 @@ impl SpanInferrer {
178182
} else if S3Record::is_match(payload_value) {
179183
if let Some(t) = S3Record::new(payload_value.clone()) {
180184
t.enrich_span(&mut inferred_span, &self.service_mapping);
185+
self.span_pointers = t.get_span_pointers();
181186

182187
trigger = Some(Box::new(t));
183188
}

bottlecap/src/lifecycle/invocation/triggers/s3_event.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::lifecycle::invocation::{
1010
processor::MS_TO_NS,
1111
triggers::{ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG},
1212
};
13+
use crate::traces::span_pointers::{generate_span_pointer_hash, SpanPointer};
1314

1415
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
1516
pub struct S3Event {
@@ -133,6 +134,28 @@ impl ServiceNameResolver for S3Record {
133134
}
134135
}
135136

137+
impl S3Record {
138+
pub fn get_span_pointers(&self) -> Option<Vec<SpanPointer>> {
139+
let bucket_name = &self.s3.bucket.name;
140+
let key = &self.s3.object.key;
141+
// The AWS SDK sometimes wraps the S3 eTag in quotes, but sometimes doesn't.
142+
let e_tag = self.s3.object.e_tag.trim_matches('"');
143+
144+
if bucket_name.is_empty() || key.is_empty() || e_tag.is_empty() {
145+
debug!("Unable to create span pointer because bucket name, key, or etag is missing.");
146+
return None;
147+
}
148+
149+
// https://github.com/DataDog/dd-span-pointer-rules/blob/main/AWS/S3/Object/README.md
150+
let hash = generate_span_pointer_hash(&[bucket_name, key, e_tag]);
151+
152+
Some(vec![SpanPointer {
153+
hash,
154+
kind: String::from("aws.s3.object"),
155+
}])
156+
}
157+
}
158+
136159
#[cfg(test)]
137160
#[allow(clippy::unwrap_used)]
138161
mod tests {
@@ -274,4 +297,51 @@ mod tests {
274297
"generic-service"
275298
);
276299
}
300+
301+
#[test]
302+
fn test_get_span_pointers() {
303+
let event = S3Record {
304+
event_source: String::from("aws:s3"),
305+
event_time: Utc::now(),
306+
event_name: String::from("ObjectCreated:Put"),
307+
s3: S3Entity {
308+
bucket: S3Bucket {
309+
name: String::from("test-bucket"),
310+
arn: String::from("arn:aws:s3:::test-bucket"),
311+
},
312+
object: S3Object {
313+
key: String::from("test/key"),
314+
size: 1024,
315+
e_tag: String::from("0123456789abcdef0123456789abcdef"),
316+
},
317+
},
318+
}; //
319+
320+
let span_pointers = event.get_span_pointers().expect("Should return Some(vec)");
321+
assert_eq!(span_pointers.len(), 1);
322+
assert_eq!(span_pointers[0].kind, "aws.s3.object");
323+
assert_eq!(span_pointers[0].hash, "40df87dbfdf59f32253a2668c23e51b4");
324+
}
325+
326+
#[test]
327+
fn test_get_span_pointers_missing_fields() {
328+
let event = S3Record {
329+
event_source: String::from("aws:s3"),
330+
event_time: Utc::now(),
331+
event_name: String::from("ObjectCreated:Put"),
332+
s3: S3Entity {
333+
bucket: S3Bucket {
334+
name: String::new(), // Empty bucket name
335+
arn: String::from("arn"),
336+
},
337+
object: S3Object {
338+
key: String::from("key"),
339+
size: 0,
340+
e_tag: String::from("etag"),
341+
},
342+
},
343+
};
344+
345+
assert!(event.get_span_pointers().is_none());
346+
}
277347
}

bottlecap/src/traces/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
pub mod context;
55
pub mod propagation;
6+
pub mod span_pointers;
67
pub mod stats_flusher;
78
pub mod stats_processor;
89
pub mod trace_agent;
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use datadog_trace_protobuf::pb::SpanLink;
5+
use sha2::{Digest, Sha256};
6+
use std::collections::HashMap;
7+
8+
const SPAN_POINTER_HASH_LENGTH: usize = 32;
9+
10+
#[derive(Clone)]
11+
pub struct SpanPointer {
12+
pub hash: String,
13+
pub kind: String,
14+
}
15+
16+
/// Returns the first 32 characters of the SHA-256 hash of the components joined by a '|'.
17+
/// Used by span pointers to uniquely & deterministically identify an `S3` or `DynamoDB` stream.
18+
/// <https://github.com/DataDog/dd-span-pointer-rules/blob/main/README.md#General%20Hashing%20Rules>
19+
#[must_use]
20+
pub fn generate_span_pointer_hash(components: &[&str]) -> String {
21+
let mut hasher = Sha256::new();
22+
hasher.update(components.join("|").as_bytes());
23+
let result = hasher.finalize();
24+
hex::encode(result)[..SPAN_POINTER_HASH_LENGTH].to_string()
25+
}
26+
27+
pub fn attach_span_pointers_to_meta<S: ::std::hash::BuildHasher>(
28+
meta: &mut HashMap<String, String, S>,
29+
span_pointers: &Option<Vec<SpanPointer>>,
30+
) {
31+
let Some(span_pointers) = span_pointers.as_ref().filter(|sp| !sp.is_empty()) else {
32+
return;
33+
};
34+
35+
let new_span_links: Vec<SpanLink> = span_pointers
36+
.iter()
37+
.map(|sp| {
38+
SpanLink {
39+
// We set all these fields as 0 or empty since they're unknown; the frontend
40+
// uses `ptr.hash` instead to find the opposite link if it exists.
41+
trace_id: 0,
42+
span_id: 0,
43+
trace_id_high: 0,
44+
tracestate: String::new(),
45+
flags: 0,
46+
attributes: HashMap::from([
47+
("link.kind".to_string(), "span-pointer".to_string()),
48+
("ptr.dir".to_string(), "u".to_string()),
49+
("ptr.hash".to_string(), sp.hash.clone()),
50+
("ptr.kind".to_string(), sp.kind.clone()),
51+
]),
52+
}
53+
})
54+
.collect();
55+
56+
let mut all_span_links = meta
57+
.get("_dd.span_links")
58+
.and_then(|existing| serde_json::from_str::<Vec<SpanLink>>(existing).ok())
59+
.unwrap_or_default();
60+
61+
all_span_links.extend(new_span_links);
62+
let _ = serde_json::to_string(&all_span_links)
63+
.map(|json| meta.insert("_dd.span_links".to_string(), json));
64+
}
65+
66+
#[cfg(test)]
67+
mod tests {
68+
use super::*;
69+
use serde_json::json;
70+
use std::collections::HashMap;
71+
72+
#[derive(Debug, Default)]
73+
struct TestSpan {
74+
pub meta: HashMap<String, String>,
75+
}
76+
77+
struct SpanPointerTestCase {
78+
test_name: &'static str,
79+
existing_links: Option<serde_json::Value>,
80+
span_pointers: Option<Vec<SpanPointer>>,
81+
expected_links: Option<serde_json::Value>,
82+
}
83+
84+
#[test]
85+
fn test_attach_span_pointers_to_span() {
86+
let test_cases = vec![
87+
SpanPointerTestCase {
88+
test_name: "adds span links to span",
89+
existing_links: None,
90+
span_pointers: Some(vec![
91+
SpanPointer {
92+
hash: "hash1".to_string(),
93+
kind: "test.kind1".to_string(),
94+
},
95+
SpanPointer {
96+
hash: "hash2".to_string(),
97+
kind: "test.kind2".to_string(),
98+
},
99+
]),
100+
expected_links: Some(json!([
101+
{
102+
"attributes": {
103+
"link.kind": "span-pointer",
104+
"ptr.dir": "u",
105+
"ptr.hash": "hash1",
106+
"ptr.kind": "test.kind1"
107+
},
108+
"span_id": 0,
109+
"trace_id": 0,
110+
"trace_id_high": 0,
111+
"tracestate": "",
112+
"flags": 0
113+
},
114+
{
115+
"attributes": {
116+
"link.kind": "span-pointer",
117+
"ptr.dir": "u",
118+
"ptr.hash": "hash2",
119+
"ptr.kind": "test.kind2"
120+
},
121+
"span_id": 0,
122+
"trace_id": 0,
123+
"trace_id_high": 0,
124+
"tracestate": "",
125+
"flags": 0
126+
}
127+
])),
128+
},
129+
SpanPointerTestCase {
130+
test_name: "handles empty span pointers",
131+
existing_links: None,
132+
span_pointers: Some(vec![]),
133+
expected_links: None,
134+
},
135+
SpanPointerTestCase {
136+
test_name: "handles None span pointers",
137+
existing_links: None,
138+
span_pointers: None,
139+
expected_links: None,
140+
},
141+
SpanPointerTestCase {
142+
test_name: "appends to existing span links",
143+
existing_links: Some(json!([{
144+
"attributes": {
145+
"link.kind": "span-pointer",
146+
"ptr.dir": "d",
147+
"ptr.hash": "hash1",
148+
"ptr.kind": "test.kind1"
149+
},
150+
"span_id": 0,
151+
"trace_id": 0,
152+
"trace_id_high": 0,
153+
"tracestate": "",
154+
"flags": 0
155+
}])),
156+
span_pointers: Some(vec![SpanPointer {
157+
hash: "hash2".to_string(),
158+
kind: "test.kind2".to_string(),
159+
}]),
160+
expected_links: Some(json!([
161+
{
162+
"attributes": {
163+
"link.kind": "span-pointer",
164+
"ptr.dir": "d",
165+
"ptr.hash": "hash1",
166+
"ptr.kind": "test.kind1"
167+
},
168+
"span_id": 0,
169+
"trace_id": 0,
170+
"trace_id_high": 0,
171+
"tracestate": "",
172+
"flags": 0
173+
},
174+
{
175+
"attributes": {
176+
"link.kind": "span-pointer",
177+
"ptr.dir": "u",
178+
"ptr.hash": "hash2",
179+
"ptr.kind": "test.kind2"
180+
},
181+
"span_id": 0,
182+
"trace_id": 0,
183+
"trace_id_high": 0,
184+
"tracestate": "",
185+
"flags": 0
186+
}
187+
])),
188+
},
189+
];
190+
191+
for case in test_cases {
192+
let mut test_span = TestSpan {
193+
meta: HashMap::new(),
194+
};
195+
196+
// Set up existing links if any
197+
if let Some(links) = case.existing_links {
198+
test_span
199+
.meta
200+
.insert("_dd.span_links".to_string(), links.to_string());
201+
}
202+
203+
attach_span_pointers_to_meta(&mut test_span.meta, &case.span_pointers);
204+
205+
match case.expected_links {
206+
Some(expected) => {
207+
let span_links = test_span.meta.get("_dd.span_links").unwrap_or_else(|| {
208+
panic!(
209+
"[{}] _dd.span_links should be present in span meta",
210+
case.test_name
211+
)
212+
});
213+
let actual_links: serde_json::Value =
214+
serde_json::from_str(span_links).expect("Should be valid JSON");
215+
assert_eq!(
216+
actual_links, expected,
217+
"Failed test case: {}",
218+
case.test_name
219+
);
220+
}
221+
None => {
222+
assert!(
223+
!test_span.meta.contains_key("_dd.span_links"),
224+
"Failed test case: {}",
225+
case.test_name
226+
);
227+
}
228+
}
229+
}
230+
}
231+
232+
#[test]
233+
fn test_generate_span_pointer_hash() {
234+
let test_cases = vec![
235+
(
236+
"basic values",
237+
vec!["some-bucket", "some-key.data", "ab12ef34"],
238+
"e721375466d4116ab551213fdea08413",
239+
),
240+
(
241+
"non-ascii key",
242+
vec!["some-bucket", "some-key.你好", "ab12ef34"],
243+
"d1333a04b9928ab462b5c6cadfa401f4",
244+
),
245+
(
246+
"multipart-upload",
247+
vec!["some-bucket", "some-key.data", "ab12ef34-5"],
248+
"2b90dffc37ebc7bc610152c3dc72af9f",
249+
),
250+
];
251+
252+
for (name, components, expected_hash) in test_cases {
253+
let actual_hash = generate_span_pointer_hash(&components);
254+
assert_eq!(actual_hash, expected_hash, "Test case: {name}");
255+
}
256+
}
257+
}

0 commit comments

Comments
 (0)