Skip to content

Commit f7cc7a0

Browse files
joeyzhao2018claudeCopilot
authored
feat(triggers): handle MSK events (#1066)
https://datadoghq.atlassian.net/browse/SLES-2739 In Kafka's wire protocol (KIP-82), header values are always byte[]. Every Kafka client library enforces this: | Tracer | Injection code | Mechanism | | ----------- | ----------- | ----------- | | dd-trace-java | headers.add(key, value.getBytes(UTF_8)) | String.getBytes() → byte[] | | dd-trace-go | Value: []byte(val) | Go type conversion → []byte | | dd-trace-dotnet | _headers.Add(name, Encoding.UTF8.GetBytes(value)) | UTF8.GetBytes() → byte[] | All three tracers accept string trace context values from the propagation layer, convert to UTF-8 bytes at the carrier adapter boundary, and hand byte[] to the Kafka client. This isn't a quirk of Java's getBytes() — it's the only way Kafka headers work. ### What MSK Lambda does When MSK triggers a Lambda, AWS serializes the Kafka record to JSON. Since header values are byte[] on the wire, AWS encodes them as decimal byte values. However, the exact JSON shape depends on the Lambda runtime: - Array format (observed in the existing msk_event.json testing payloads, i didn't change the support for this to be safe): byte values as a JSON array of integers "headers": [{"x-datadog-trace-id": [51, 54, 57, ...]}] - Object format (observed with the Java Lambda runtime): both the records list and the per-header byte values are JSON objects with numeric string keys, and byte values are decimal strings "records": { "topic-0": { "0": { "headers": { "0": {"someOtherHeader": ["70", "114", ...]}, "2": {"x-datadog-trace-id": {"0":"52","1":"54",...}}, "4": {"x-datadog-sampling-priority": ["49"]} } } } } - Note that Datadog headers can appear at any index — non-instrumentation headers may precede them. ### What's the difference between the [msk_event.json](https://github.com/DataDog/datadog-lambda-extension/blob/main/bottlecap/tests/payloads/msk_event.json) and the newly added `msk_event_with_headers.json` here? - msk_event.json represents a standard MSK trigger where the producer didn't attach any Kafka headers — i.e. no Datadog tracer was running on the producer side (or it's a non-instrumented producer like a raw Kafka client, a Kinesis Firehose delivery stream, or a schema-registry message). In those cases Lambda still delivers the event but with "headers": []. It's also the format you get when testing MSK triggers manually in the AWS console, which doesn't inject headers. ( source: Claude Code) - msk_event_with_headers.json reflects the real-world object format produced by the Java Lambda runtime, with a producer instrumented with a Datadog tracer injecting trace context as Kafka headers. It includes non-Datadog headers at lower indices to verify that the carrier extraction correctly finds Datadog headers regardless of their position. (source: I did a real world example and below is the evidence of testing) <img width="1753" height="442" alt="Screenshot 2026-03-12 at 11 14 33 PM" src="https://github.com/user-attachments/assets/f354dc54-77aa-4dcd-9e84-df4dc36102ca" /> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent 8736bc9 commit f7cc7a0

File tree

2 files changed

+323
-16
lines changed

2 files changed

+323
-16
lines changed

bottlecap/src/lifecycle/invocation/triggers/msk_event.rs

Lines changed: 297 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,164 @@ pub struct MSKRecord {
2121
pub topic: String,
2222
pub partition: i32,
2323
pub timestamp: f64,
24+
#[serde(default)]
25+
pub headers: Value,
26+
}
27+
28+
/// Decodes a header value into raw bytes. Two formats have been observed:
29+
///
30+
/// - **Array**: elements are integers `[104, 101]` or decimal strings `["49"]`
31+
/// - **Object** with numeric string keys and decimal string values: `{"0":"52","1":"54",...}`
32+
fn bytes_from_header_value(val: &Value) -> Option<Vec<u8>> {
33+
match val {
34+
// Array format: elements may be integers `[104, 101]` or decimal strings `["49"]`
35+
Value::Array(arr) => arr
36+
.iter()
37+
.map(|v| match v {
38+
Value::Number(n) => n.as_u64().and_then(|n| u8::try_from(n).ok()),
39+
Value::String(s) => s.parse::<u8>().ok(),
40+
_ => None,
41+
})
42+
.collect(),
43+
// Object format with numeric string keys and decimal string values: `{"0":"52","1":"54",...}`
44+
Value::Object(obj) => {
45+
let mut pairs: Vec<(u64, u8)> = obj
46+
.iter()
47+
.filter_map(|(k, v)| {
48+
Some((k.parse::<u64>().ok()?, v.as_str()?.parse::<u8>().ok()?))
49+
})
50+
.collect();
51+
pairs.sort_by_key(|(idx, _)| *idx);
52+
Some(pairs.into_iter().map(|(_, b)| b).collect())
53+
}
54+
_ => None,
55+
}
56+
}
57+
58+
/// Returns true if the `headers` JSON contains a trace context header key.
59+
/// This performs a lightweight scan of the raw JSON structure without decoding
60+
/// header values or allocating intermediate collections.
61+
fn headers_has_trace_context(headers: &Value) -> bool {
62+
let is_trace_context_entry = |entry: &Value| {
63+
if let Value::Object(header_map) = entry {
64+
header_map.keys().any(|k| {
65+
k.eq_ignore_ascii_case("x-datadog-trace-id")
66+
|| k.eq_ignore_ascii_case("traceparent")
67+
})
68+
} else {
69+
false
70+
}
71+
};
72+
match headers {
73+
Value::Array(arr) => arr.iter().any(is_trace_context_entry),
74+
Value::Object(obj) => obj.values().any(is_trace_context_entry),
75+
_ => false,
76+
}
77+
}
78+
79+
/// Scans all records in the records map and returns the `(topic_key, record_value)` of the first
80+
/// record whose headers contain a tracecontext key. Returns `None` if none found.
81+
fn find_record_with_trace_context(
82+
records_map: &serde_json::Map<String, Value>,
83+
) -> Option<(String, Value)> {
84+
for (key, group) in records_map {
85+
match group {
86+
Value::Array(arr) => {
87+
for record in arr {
88+
if let Some(headers) = record.get("headers")
89+
&& headers_has_trace_context(headers)
90+
{
91+
return Some((key.clone(), record.clone()));
92+
}
93+
}
94+
}
95+
Value::Object(obj) => {
96+
for record in obj.values() {
97+
if let Some(headers) = record.get("headers")
98+
&& headers_has_trace_context(headers)
99+
{
100+
return Some((key.clone(), record.clone()));
101+
}
102+
}
103+
}
104+
_ => {}
105+
}
106+
}
107+
None
108+
}
109+
110+
/// Decodes an MSK record's `headers` field into a `HashMap<String, String>` by converting
111+
/// each header's byte values to a UTF-8 string. The `headers` field may be either a JSON
112+
/// array or a JSON object with numeric string keys, one entry per Kafka header, ordered by index.
113+
fn headers_to_string_map(headers: &Value) -> HashMap<String, String> {
114+
let mut carrier = HashMap::new();
115+
116+
match headers {
117+
Value::Array(arr) => {
118+
for entry in arr {
119+
if let Value::Object(header_map) = entry {
120+
for (key, val) in header_map {
121+
if let Some(bytes) = bytes_from_header_value(val)
122+
&& let Ok(s) = String::from_utf8(bytes)
123+
{
124+
carrier.insert(key.to_lowercase(), s);
125+
}
126+
}
127+
}
128+
}
129+
}
130+
// Object format: numeric string keys are just ordering artifacts from the Java runtime;
131+
// insertion order into the HashMap doesn't matter so no sort needed.
132+
Value::Object(obj) => {
133+
for entry in obj.values() {
134+
if let Value::Object(header_map) = entry {
135+
for (key, val) in header_map {
136+
if let Some(bytes) = bytes_from_header_value(val)
137+
&& let Ok(s) = String::from_utf8(bytes)
138+
{
139+
carrier.insert(key.to_lowercase(), s);
140+
}
141+
}
142+
}
143+
}
144+
}
145+
_ => {}
146+
}
147+
148+
carrier
24149
}
25150

26151
impl Trigger for MSKEvent {
27152
fn new(mut payload: Value) -> Option<Self> {
28-
// We only care about the first item in the first record, so drop the others before deserializing.
29-
if let Some(records_map) = payload.get_mut("records").and_then(Value::as_object_mut) {
30-
match records_map.iter_mut().next() {
31-
Some((first_key, Value::Array(arr))) => {
32-
arr.truncate(1);
33-
let key = first_key.clone();
34-
records_map.retain(|k, _| k == &key);
35-
}
36-
_ => {
37-
records_map.clear();
153+
// We only need one record: prefer the first one carrying Datadog trace context so we can
154+
// propagate the trace, falling back to the very first record otherwise. Records may be
155+
// delivered as a JSON object with numeric string keys; normalize to a single-element array
156+
// before deserializing.
157+
let chosen = payload
158+
.get("records")
159+
.and_then(Value::as_object)
160+
.and_then(find_record_with_trace_context);
161+
162+
if let Some((chosen_key, chosen_record)) = chosen {
163+
let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?;
164+
records_map.retain(|k, _| k == &chosen_key);
165+
if let Some(entry) = records_map.get_mut(&chosen_key) {
166+
*entry = Value::Array(vec![chosen_record]);
167+
}
168+
} else {
169+
// Fallback: no record with Datadog trace context; normalize to the very first record
170+
// without cloning the full record payload.
171+
let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?;
172+
let first_key = records_map.keys().next()?.to_owned();
173+
records_map.retain(|k, _| k == &first_key);
174+
if let Some(entry) = records_map.get_mut(&first_key) {
175+
match entry {
176+
Value::Array(arr) => arr.truncate(1),
177+
Value::Object(obj) => {
178+
let first_record = obj.values().next()?.clone();
179+
*entry = Value::Array(vec![first_record]);
180+
}
181+
_ => return None,
38182
}
39183
}
40184
}
@@ -49,13 +193,16 @@ impl Trigger for MSKEvent {
49193
}
50194

51195
fn is_match(payload: &Value) -> bool {
52-
payload
196+
let first_record_group = payload
53197
.get("records")
54198
.and_then(Value::as_object)
55-
.and_then(|map| map.values().next())
56-
.and_then(Value::as_array)
57-
.and_then(|arr| arr.first())
58-
.is_some_and(|rec| rec.get("topic").is_some())
199+
.and_then(|map| map.values().next());
200+
let first_record = match first_record_group {
201+
Some(Value::Array(arr)) => arr.first(),
202+
Some(Value::Object(obj)) => obj.values().next(),
203+
_ => return false,
204+
};
205+
first_record.is_some_and(|rec| rec.get("topic").is_some())
59206
}
60207

61208
#[allow(clippy::cast_possible_truncation)]
@@ -105,7 +252,12 @@ impl Trigger for MSKEvent {
105252
}
106253

107254
fn get_carrier(&self) -> HashMap<String, String> {
108-
HashMap::new()
255+
self.records
256+
.values()
257+
.find_map(|arr| arr.first())
258+
.map_or_else(HashMap::new, |record| {
259+
headers_to_string_map(&record.headers)
260+
})
109261
}
110262

111263
fn is_async(&self) -> bool {
@@ -142,6 +294,7 @@ mod tests {
142294
topic: String::from("topic1"),
143295
partition: 0,
144296
timestamp: 1_745_846_213_022f64,
297+
headers: Value::Array(vec![]),
145298
};
146299
let mut expected_records = HashMap::new();
147300
expected_records.insert(String::from("topic1"), vec![record]);
@@ -335,4 +488,132 @@ mod tests {
335488
"msk" // fallback value
336489
);
337490
}
491+
492+
#[test]
493+
fn test_new_with_headers() {
494+
let json = read_json_file("msk_event_with_headers.json");
495+
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
496+
let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent");
497+
498+
let record = result
499+
.records
500+
.values()
501+
.find_map(|arr| arr.first())
502+
.expect("Expected at least one record");
503+
assert_eq!(record.topic, "demo-topic");
504+
// headers is an object with 6 entries (2 non-datadog + 4 datadog)
505+
assert_eq!(
506+
record.headers.as_object().map(serde_json::Map::len),
507+
Some(6)
508+
);
509+
}
510+
511+
#[test]
512+
fn test_is_match_with_headers() {
513+
let json = read_json_file("msk_event_with_headers.json");
514+
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
515+
516+
assert!(MSKEvent::is_match(&payload));
517+
}
518+
519+
#[test]
520+
fn test_new_prefers_record_with_trace_context() {
521+
// Two records in topic1: first has no headers, second has x-datadog-trace-id.
522+
// [49, 50, 51] = ASCII "123"
523+
let payload = serde_json::json!({
524+
"eventSource": "aws:kafka",
525+
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
526+
"bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
527+
"records": {
528+
"topic1": [
529+
{
530+
"topic": "topic1", "partition": 0, "offset": 100,
531+
"timestamp": 1000.0, "timestampType": "CREATE_TIME",
532+
"key": null, "value": null,
533+
"headers": []
534+
},
535+
{
536+
"topic": "topic1", "partition": 0, "offset": 101,
537+
"timestamp": 2000.0, "timestampType": "CREATE_TIME",
538+
"key": null, "value": null,
539+
"headers": [{"x-datadog-trace-id": [49, 50, 51]}]
540+
}
541+
]
542+
}
543+
});
544+
545+
let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent");
546+
let carrier = event.get_carrier();
547+
assert_eq!(
548+
carrier.get("x-datadog-trace-id").map(String::as_str),
549+
Some("123"),
550+
"Should pick the record with trace context, not the first one"
551+
);
552+
}
553+
554+
#[test]
555+
fn test_get_carrier_with_headers() {
556+
let json = read_json_file("msk_event_with_headers.json");
557+
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
558+
let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent");
559+
let carrier = event.get_carrier();
560+
561+
// Datadog headers appear at indices 2-5; non-datadog headers at 0-1 are also decoded
562+
// but won't be used by the propagator.
563+
assert_eq!(
564+
carrier.get("x-datadog-trace-id").map(String::as_str),
565+
Some("1497116011738644768")
566+
);
567+
assert_eq!(
568+
carrier.get("x-datadog-parent-id").map(String::as_str),
569+
Some("2239801583077304042")
570+
);
571+
assert_eq!(
572+
carrier
573+
.get("x-datadog-sampling-priority")
574+
.map(String::as_str),
575+
Some("1")
576+
);
577+
assert_eq!(
578+
carrier.get("x-datadog-tags").map(String::as_str),
579+
Some("_dd.p.dm=-1,_dd.p.tid=699c836500000000")
580+
);
581+
}
582+
583+
/// Verifies that a Java-runtime-format payload (records as object with numeric string keys)
584+
/// without any trace context falls back to the first record and deserializes successfully.
585+
#[test]
586+
fn test_new_java_format_no_trace_context() {
587+
let payload = serde_json::json!({
588+
"eventSource": "aws:kafka",
589+
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
590+
"bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
591+
"records": {
592+
"demo-topic-0": {
593+
"0": {
594+
"topic": "demo-topic", "partition": 0, "offset": 5,
595+
"timestamp": 1000.0, "timestampType": "CREATE_TIME",
596+
"key": null, "value": null,
597+
"headers": {}
598+
},
599+
"1": {
600+
"topic": "demo-topic", "partition": 0, "offset": 6,
601+
"timestamp": 2000.0, "timestampType": "CREATE_TIME",
602+
"key": null, "value": null,
603+
"headers": {}
604+
}
605+
}
606+
}
607+
});
608+
609+
let event = MSKEvent::new(payload).expect("Should deserialize despite no trace context");
610+
let record = event
611+
.records
612+
.values()
613+
.find_map(|arr| arr.first())
614+
.expect("Expected at least one record");
615+
assert_eq!(record.topic, "demo-topic");
616+
assert_eq!(record.partition, 0);
617+
assert!(event.get_carrier().is_empty());
618+
}
338619
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"eventSource": "aws:kafka",
3+
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
4+
"bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
5+
"records": {
6+
"demo-topic-0": {
7+
"0": {
8+
"topic": "demo-topic",
9+
"partition": 0,
10+
"offset": 101,
11+
"timestamp": 1745846213022,
12+
"timestampType": "CREATE_TIME",
13+
"key": "b3JkZXJJZA==",
14+
"value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==",
15+
"headers": {
16+
"0": {"someId": ["70","114","111","109","66","114","117","110","111"]},
17+
"1": {"anotherId": {"0":"55","1":"52","2":"50","3":"101","4":"101","5":"101","6":"52","7":"57","8":"45","9":"101","10":"51","11":"100","12":"55","13":"45","14":"52","15":"51","16":"52","17":"54","18":"45","19":"97","20":"54","21":"57","22":"57","23":"45","24":"52","25":"100","26":"49","27":"56","28":"101","29":"99","30":"98","31":"53","32":"53","33":"101","34":"50","35":"99"}},
18+
"2": {"x-datadog-trace-id": {"0":"49","1":"52","2":"57","3":"55","4":"49","5":"49","6":"54","7":"48","8":"49","9":"49","10":"55","11":"51","12":"56","13":"54","14":"52","15":"52","16":"55","17":"54","18":"56"}},
19+
"3": {"x-datadog-parent-id": {"0":"50","1":"50","2":"51","3":"57","4":"56","5":"48","6":"49","7":"53","8":"56","9":"51","10":"48","11":"55","12":"55","13":"51","14":"48","15":"52","16":"48","17":"52","18":"50"}},
20+
"4": {"x-datadog-sampling-priority": ["49"]},
21+
"5": {"x-datadog-tags": {"0":"95","1":"100","2":"100","3":"46","4":"112","5":"46","6":"100","7":"109","8":"61","9":"45","10":"49","11":"44","12":"95","13":"100","14":"100","15":"46","16":"112","17":"46","18":"116","19":"105","20":"100","21":"61","22":"54","23":"57","24":"57","25":"99","26":"56","27":"51","28":"54","29":"53","30":"48","31":"48","32":"48","33":"48","34":"48","35":"48","36":"48","37":"48"}}
22+
}
23+
}
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)