Skip to content

Commit 513f0ca

Browse files
committed
handle MSK events
1 parent dd17d4a commit 513f0ca

File tree

2 files changed

+69
-1
lines changed

2 files changed

+69
-1
lines changed

bottlecap/src/lifecycle/invocation/triggers/msk_event.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ pub struct MSKRecord {
2121
pub topic: String,
2222
pub partition: i32,
2323
pub timestamp: f64,
24+
#[serde(default)]
25+
pub headers: Vec<HashMap<String, Vec<u8>>>,
2426
}
2527

2628
impl Trigger for MSKEvent {
@@ -105,7 +107,17 @@ impl Trigger for MSKEvent {
105107
}
106108

107109
fn get_carrier(&self) -> HashMap<String, String> {
108-
HashMap::new()
110+
let mut carrier = HashMap::new();
111+
if let Some(record) = self.records.values().find_map(|arr| arr.first()) {
112+
for header_map in &record.headers {
113+
for (key, value_bytes) in header_map {
114+
if let Ok(value_str) = String::from_utf8(value_bytes.clone()) {
115+
carrier.insert(key.to_lowercase(), value_str);
116+
}
117+
}
118+
}
119+
}
120+
carrier
109121
}
110122

111123
fn is_async(&self) -> bool {
@@ -142,6 +154,7 @@ mod tests {
142154
topic: String::from("topic1"),
143155
partition: 0,
144156
timestamp: 1_745_846_213_022f64,
157+
headers: vec![],
145158
};
146159
let mut expected_records = HashMap::new();
147160
expected_records.insert(String::from("topic1"), vec![record]);
@@ -335,4 +348,36 @@ mod tests {
335348
"msk" // fallback value
336349
);
337350
}
351+
352+
#[test]
353+
fn test_new_with_headers() {
354+
let json = read_json_file("msk_event_with_headers.json");
355+
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
356+
let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent");
357+
358+
let record = result.records.values().find_map(|arr| arr.first()).unwrap();
359+
assert_eq!(record.topic, "topic1");
360+
assert_eq!(record.headers.len(), 3);
361+
}
362+
363+
#[test]
364+
fn test_get_carrier_with_headers() {
365+
let json = read_json_file("msk_event_with_headers.json");
366+
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
367+
let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent");
368+
let carrier = event.get_carrier();
369+
370+
assert_eq!(
371+
carrier.get("x-datadog-trace-id").map(String::as_str),
372+
Some("36979754430890456950")
373+
);
374+
assert_eq!(
375+
carrier.get("x-datadog-parent-id").map(String::as_str),
376+
Some("7431398482019833808")
377+
);
378+
assert_eq!(
379+
carrier.get("x-datadog-sampling-priority").map(String::as_str),
380+
Some("1")
381+
);
382+
}
338383
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
{
2+
"eventSource": "aws:kafka",
3+
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
4+
"bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
5+
"records": {
6+
"topic1": [
7+
{
8+
"topic": "topic1",
9+
"partition": 0,
10+
"offset": 101,
11+
"timestamp": 1745846213022,
12+
"timestampType":"CREATE_TIME",
13+
"key": "b3JkZXJJZA==",
14+
"value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==",
15+
"headers": [
16+
{"x-datadog-trace-id": [51, 54, 57, 55, 57, 55, 53, 52, 52, 51, 48, 56, 57, 48, 52, 53, 54, 57, 53, 48]},
17+
{"x-datadog-parent-id": [55, 52, 51, 49, 51, 57, 56, 52, 56, 50, 48, 49, 57, 56, 51, 51, 56, 48, 56]},
18+
{"x-datadog-sampling-priority": [49]}
19+
]
20+
}
21+
]
22+
}
23+
}

0 commit comments

Comments
 (0)