Skip to content

Commit 293c624

Browse files
authored
DynamoDB Span Pointers (#619)
### What does this PR do? Adds span pointers to spans for Lambdas triggered by DynamoDB streams. This change will affect the downstream case for Universal Instrumentation Lambda runtimes (Java, .NET, Golang). Span pointers are similar to [Span Links](https://docs.datadoghq.com/tracing/trace_collection/span_links/), but for cases when it is impossible to pass the Trace ID and Span ID between the spans that need to be linked. When the calculated hashes for the upstream and downstream lambdas match, the Datadog frontend will automatically link the two traces together. Upstream case (requires tracer to be instrumented): <img width="1200" alt="Screenshot 2025-03-24 at 1 46 12 PM" src="https://github.com/user-attachments/assets/85969fba-a032-459d-9e29-5bd285793f7f" /> Downstream case (this code): <img width="1200" alt="Screenshot 2025-03-24 at 1 46 56 PM" src="https://github.com/user-attachments/assets/016f0f8f-7399-40db-8077-bb3cb24d2807" /> When clicking on the linked span, a new tab opens linking to the opposite Lambda function. ### Describe how you validated your changes Mostly manual testing, but I also added unit tests. There are also functional tests in https://github.com/DataDog/serverless-e2e-tests -- DynamoDB is TODO but will be implemented soon.
1 parent e177d55 commit 293c624

File tree

2 files changed

+168
-0
lines changed

2 files changed

+168
-0
lines changed

bottlecap/src/lifecycle/invocation/span_inferrer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ impl SpanInferrer {
176176
} else if DynamoDbRecord::is_match(payload_value) {
177177
if let Some(t) = DynamoDbRecord::new(payload_value.clone()) {
178178
t.enrich_span(&mut inferred_span, &self.service_mapping);
179+
self.span_pointers = t.get_span_pointers();
179180

180181
trigger = Some(Box::new(t));
181182
}

bottlecap/src/lifecycle/invocation/triggers/dynamodb_event.rs

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use base64::{engine::general_purpose::STANDARD, Engine};
12
use datadog_trace_protobuf::pb::Span;
23
use serde::{Deserialize, Serialize};
34
use serde_json::Value;
@@ -8,6 +9,7 @@ use crate::lifecycle::invocation::{
89
processor::S_TO_NS,
910
triggers::{ServiceNameResolver, Trigger, FUNCTION_TRIGGER_EVENT_SOURCE_TAG},
1011
};
12+
use crate::traces::span_pointers::{generate_span_pointer_hash, SpanPointer};
1113

1214
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
1315
pub struct DynamoDbEvent {
@@ -37,6 +39,31 @@ pub struct DynamoDbEntity {
3739
pub size_bytes: i64,
3840
#[serde(rename = "StreamViewType")]
3941
pub stream_view_type: String,
42+
#[serde(rename = "Keys")]
43+
pub keys: HashMap<String, AttributeValue>,
44+
}
45+
46+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
47+
// An attribute value is formatted like this: {"S": "string_value"}
48+
// and it can be a string, number (as a string), or binary value (as a Base64-encoded string).
49+
pub enum AttributeValue {
50+
S(String),
51+
N(String),
52+
B(String),
53+
}
54+
55+
impl AttributeValue {
56+
fn to_string(&self) -> Option<String> {
57+
match self {
58+
AttributeValue::S(string_value) => Some(string_value.clone()),
59+
AttributeValue::N(number_value) => Some(number_value.clone()),
60+
// Convert Base64-encoded string to original string
61+
AttributeValue::B(binary_value) => STANDARD
62+
.decode(binary_value)
63+
.ok()
64+
.and_then(|bytes| String::from_utf8(bytes).ok()),
65+
}
66+
}
4067
}
4168

4269
impl Trigger for DynamoDbRecord {
@@ -143,6 +170,61 @@ impl ServiceNameResolver for DynamoDbRecord {
143170
}
144171
}
145172

173+
impl DynamoDbRecord {
174+
#[must_use]
175+
pub fn get_span_pointers(&self) -> Option<Vec<SpanPointer>> {
176+
if self.dynamodb.keys.is_empty() {
177+
return None;
178+
}
179+
180+
let table_name = self.get_specific_identifier();
181+
182+
// DynamoDB tables have either one primary key (partition key) or two primary keys (partition + sort)
183+
#[allow(clippy::single_match_else)]
184+
let (primary_key1, value1, primary_key2, value2) = match self.dynamodb.keys.len() {
185+
1 => {
186+
let (key, attr_value) = self
187+
.dynamodb
188+
.keys
189+
.iter()
190+
.next()
191+
.expect("No DynamoDB keys found");
192+
193+
let value = attr_value.to_string()?;
194+
(key.clone(), value, String::new(), String::new())
195+
}
196+
_ => {
197+
// For two keys, sort lexicographically for consistent ordering
198+
let mut keys: Vec<(&String, &AttributeValue)> = self.dynamodb.keys.iter().collect();
199+
keys.sort_by(|a, b| a.0.cmp(b.0));
200+
201+
let (k1, attr1) = keys[0];
202+
// If unable to get string value, just return None
203+
let v1 = attr1.to_string()?;
204+
205+
let (k2, attr2) = keys[1];
206+
let v2 = attr2.to_string()?;
207+
208+
(k1.clone(), v1, k2.clone(), v2)
209+
}
210+
};
211+
212+
let parts = [
213+
table_name.as_str(),
214+
primary_key1.as_str(),
215+
value1.as_str(),
216+
primary_key2.as_str(),
217+
value2.as_str(),
218+
];
219+
let hash = generate_span_pointer_hash(&parts);
220+
221+
Some(vec![SpanPointer {
222+
hash,
223+
kind: String::from("aws.dynamodb.item"),
224+
}])
225+
}
226+
}
227+
146228
#[cfg(test)]
147229
mod tests {
148230
use super::*;
@@ -154,11 +236,15 @@ mod tests {
154236
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
155237
let result = DynamoDbRecord::new(payload).expect("Failed to deserialize into Record");
156238

239+
let mut expected_keys = HashMap::new();
240+
expected_keys.insert("Id".to_string(), AttributeValue::N("101".to_string()));
241+
157242
let expected = DynamoDbRecord {
158243
dynamodb: DynamoDbEntity {
159244
approximate_creation_date_time: 1_428_537_600.0,
160245
size_bytes: 26,
161246
stream_view_type: String::from("NEW_AND_OLD_IMAGES"),
247+
keys: expected_keys,
162248
},
163249
event_id: String::from("c4ca4238a0b923820dcc509a6f75849b"),
164250
event_name: String::from("INSERT"),
@@ -280,4 +366,85 @@ mod tests {
280366
"generic-service"
281367
);
282368
}
369+
370+
#[test]
371+
fn test_get_span_pointers_single_key() {
372+
let mut keys = HashMap::new();
373+
keys.insert("id".to_string(), AttributeValue::S("abc123".to_string()));
374+
375+
let event = DynamoDbRecord {
376+
dynamodb: DynamoDbEntity {
377+
approximate_creation_date_time: 0.0,
378+
size_bytes: 26,
379+
stream_view_type: String::from("NEW_AND_OLD_IMAGES"),
380+
keys,
381+
},
382+
event_id: String::from("abc123"),
383+
event_name: String::from("INSERT"),
384+
event_version: String::from("1.1"),
385+
event_source_arn: String::from("arn:aws:dynamodb:us-east-1:123456789012:table/TestTable/stream/2015-06-27T00:48:05.899"),
386+
};
387+
388+
let span_pointers = event.get_span_pointers().expect("Should return Some(vec)");
389+
assert_eq!(span_pointers.len(), 1);
390+
assert_eq!(span_pointers[0].kind, "aws.dynamodb.item");
391+
assert_eq!(span_pointers[0].hash, "69706c9e1e41a2f0cf8c0650f91cb0c2");
392+
}
393+
394+
#[test]
395+
fn test_get_span_pointers_mixed_keys() {
396+
let mut keys = HashMap::new();
397+
keys.insert("num_key".to_string(), AttributeValue::N("42".to_string()));
398+
keys.insert(
399+
"bin_key".to_string(),
400+
AttributeValue::B(STANDARD.encode("Hello World".as_bytes())),
401+
);
402+
403+
let event = DynamoDbRecord {
404+
dynamodb: DynamoDbEntity {
405+
approximate_creation_date_time: 0.0,
406+
size_bytes: 26,
407+
stream_view_type: String::from("NEW_AND_OLD_IMAGES"),
408+
keys,
409+
},
410+
event_id: String::from("123abc"),
411+
event_name: String::from("INSERT"),
412+
event_version: String::from("1.1"),
413+
event_source_arn: String::from("arn:aws:dynamodb:us-east-1:123456789012:table/TestTable/stream/2015-06-27T00:48:05.899"),
414+
};
415+
416+
let span_pointers = event.get_span_pointers().expect("Should return Some(vec)");
417+
assert_eq!(span_pointers.len(), 1);
418+
assert_eq!(span_pointers[0].kind, "aws.dynamodb.item");
419+
assert_eq!(span_pointers[0].hash, "2031d2d69b45adc3d5c27691924ddfcc");
420+
}
421+
422+
#[test]
423+
fn test_get_span_pointers_lexicographical_ordering() {
424+
// Same as previous test but with keys in reverse order to test sorting
425+
let mut keys = HashMap::new();
426+
keys.insert(
427+
"bin_key".to_string(),
428+
AttributeValue::B(STANDARD.encode("Hello World".as_bytes())),
429+
);
430+
keys.insert("num_key".to_string(), AttributeValue::N("42".to_string()));
431+
432+
let event = DynamoDbRecord {
433+
dynamodb: DynamoDbEntity {
434+
approximate_creation_date_time: 0.0,
435+
size_bytes: 26,
436+
stream_view_type: String::from("NEW_AND_OLD_IMAGES"),
437+
keys,
438+
},
439+
event_id: String::from("123abc"),
440+
event_name: String::from("INSERT"),
441+
event_version: String::from("1.1"),
442+
event_source_arn: String::from("arn:aws:dynamodb:us-east-1:123456789012:table/TestTable/stream/2015-06-27T00:48:05.899"),
443+
};
444+
445+
let span_pointers = event.get_span_pointers().expect("Should return Some(vec)");
446+
assert_eq!(span_pointers.len(), 1);
447+
assert_eq!(span_pointers[0].kind, "aws.dynamodb.item");
448+
assert_eq!(span_pointers[0].hash, "2031d2d69b45adc3d5c27691924ddfcc"); // same as previous test
449+
}
283450
}

0 commit comments

Comments
 (0)