Skip to content

Commit 57315b8

Browse files
authored
[SVLS-8231] fixing log requestId for LMI (#973)
https://datadoghq.atlassian.net/browse/SVLS-8231 ## Overview * Bug fix - When running a LMI function, function logs don't get attributed with `lambda.requestId`. As a result, no function logs appear for an invocation. They still show up for the function, just not not on the invocation. * We could have alternatively updated the UI to search for `lambda.requestId` and `requestId` instead of just searching for `lambda.requestId`. However, I would prefer for On Demand and Lambda Managed Instance Functions to be consistent. So, both using lambda.requestId is preferred. ## Testing * Added integration tests that checks for logs per invocation request. * Also confirmed in the UI we are seeing the logs for an invocation (before/after screenshot below). <img width="1178" height="467" alt="Screenshot 2025-12-30 at 8 27 34 AM" src="https://github.com/user-attachments/assets/3d9d8c50-6ede-4bae-bcab-a3cf3fed9f32" /> <img width="1178" height="467" alt="Screenshot 2025-12-30 at 8 20 34 AM" src="https://github.com/user-attachments/assets/2e247e6a-bc9c-47c6-8f66-5cf2d57633c6" />
1 parent 82fc356 commit 57315b8

2 files changed

Lines changed: 139 additions & 3 deletions

File tree

bottlecap/src/logs/lambda/processor.rs

Lines changed: 138 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,44 @@ impl LambdaProcessor {
8989
async fn get_message(&mut self, event: TelemetryEvent) -> Result<Message, Box<dyn Error>> {
9090
let copy = event.clone();
9191
match event.record {
92-
TelemetryRecord::Function(v) | TelemetryRecord::Extension(v) => {
92+
TelemetryRecord::Function(v) => {
93+
let (request_id, message) = match v {
94+
serde_json::Value::Object(obj) => {
95+
let request_id = if self.is_managed_instance_mode {
96+
obj.get("requestId")
97+
.or_else(|| obj.get("AWSRequestId"))
98+
.and_then(|v| v.as_str())
99+
.map(ToString::to_string)
100+
} else {
101+
None
102+
};
103+
let msg = Some(serde_json::to_string(&obj).unwrap_or_default());
104+
(request_id, msg)
105+
},
106+
serde_json::Value::String(s) => (None, Some(s)),
107+
_ => (None, None),
108+
};
109+
110+
if let Some(message) = message {
111+
if is_oom_error(&message) {
112+
debug!("LOGS | Got a runtime-specific OOM error. Incrementing OOM metric.");
113+
if let Err(e) = self.event_bus.send(Event::OutOfMemory(event.time.timestamp())).await {
114+
error!("LOGS | Failed to send OOM event to the main event bus: {e}");
115+
}
116+
}
117+
118+
return Ok(Message::new(
119+
message,
120+
request_id,
121+
self.function_arn.clone(),
122+
event.time.timestamp_millis(),
123+
None,
124+
));
125+
}
126+
127+
Err("Unable to parse log".into())
128+
}
129+
TelemetryRecord::Extension(v) => {
93130
let message = match v {
94131
serde_json::Value::Object(obj) => Some(serde_json::to_string(&obj).unwrap_or_default()),
95132
serde_json::Value::String(s) => Some(s),
@@ -1495,4 +1532,104 @@ mod tests {
14951532
assert_eq!(log2.message.lambda.request_id, None);
14961533
assert_eq!(processor.orphan_logs.len(), 0);
14971534
}
1535+
1536+
#[tokio::test]
1537+
async fn test_lmi_extracts_request_id_from_function_json() {
1538+
let tags = HashMap::from([("test".to_string(), "tags".to_string())]);
1539+
let config = Arc::new(config::Config {
1540+
service: Some("test-service".to_string()),
1541+
tags: tags.clone(),
1542+
..config::Config::default()
1543+
});
1544+
1545+
let tags_provider = Arc::new(provider::Provider::new(
1546+
Arc::clone(&config),
1547+
LAMBDA_RUNTIME_SLUG.to_string(),
1548+
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
1549+
));
1550+
1551+
let (tx, _) = tokio::sync::mpsc::channel(2);
1552+
1553+
let mut processor = LambdaProcessor::new(
1554+
tags_provider,
1555+
Arc::new(config::Config {
1556+
service: Some("test-service".to_string()),
1557+
tags,
1558+
..config::Config::default()
1559+
}),
1560+
tx.clone(),
1561+
true, // LMI mode
1562+
);
1563+
1564+
// Test with "requestId" field
1565+
let mut obj = serde_json::Map::new();
1566+
obj.insert(
1567+
"requestId".to_string(),
1568+
Value::String("test-request-123".to_string()),
1569+
);
1570+
obj.insert(
1571+
"message".to_string(),
1572+
Value::String("Hello World".to_string()),
1573+
);
1574+
1575+
let event = TelemetryEvent {
1576+
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(),
1577+
record: TelemetryRecord::Function(Value::Object(obj)),
1578+
};
1579+
1580+
let result = processor.get_message(event).await.unwrap();
1581+
assert_eq!(
1582+
result.lambda.request_id,
1583+
Some("test-request-123".to_string())
1584+
);
1585+
}
1586+
1587+
#[tokio::test]
1588+
async fn test_regular_lambda_does_not_extract_request_id() {
1589+
let tags = HashMap::from([("test".to_string(), "tags".to_string())]);
1590+
let config = Arc::new(config::Config {
1591+
service: Some("test-service".to_string()),
1592+
tags: tags.clone(),
1593+
..config::Config::default()
1594+
});
1595+
1596+
let tags_provider = Arc::new(provider::Provider::new(
1597+
Arc::clone(&config),
1598+
LAMBDA_RUNTIME_SLUG.to_string(),
1599+
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
1600+
));
1601+
1602+
let (tx, _) = tokio::sync::mpsc::channel(2);
1603+
1604+
let mut processor = LambdaProcessor::new(
1605+
tags_provider,
1606+
Arc::new(config::Config {
1607+
service: Some("test-service".to_string()),
1608+
tags,
1609+
..config::Config::default()
1610+
}),
1611+
tx.clone(),
1612+
false, // Regular Lambda mode (not LMI)
1613+
);
1614+
1615+
// Test that requestId is NOT extracted in regular Lambda mode
1616+
let mut obj = serde_json::Map::new();
1617+
obj.insert(
1618+
"requestId".to_string(),
1619+
Value::String("test-request-789".to_string()),
1620+
);
1621+
obj.insert(
1622+
"message".to_string(),
1623+
Value::String("Hello World".to_string()),
1624+
);
1625+
1626+
let event = TelemetryEvent {
1627+
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(),
1628+
record: TelemetryRecord::Function(Value::Object(obj)),
1629+
};
1630+
1631+
let result = processor.get_message(event).await.unwrap();
1632+
// Should be None because we're not in LMI mode
1633+
assert_eq!(result.lambda.request_id, None);
1634+
}
14981635
}

integration-tests/tests/lmi.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ describe('LMI Integration Tests', () => {
3434
expect(results[runtime].logs!.length).toBeGreaterThan(0);
3535
});
3636

37-
// SVLS-8231
38-
test.failing('should have "Hello World!" log message', () => {
37+
it('should have "Hello World!" log message', () => {
3938
expect(results[runtime].logs).toBeDefined();
4039
const helloWorldLog = results[runtime].logs!.find((log: any) =>
4140
log.message && log.message.includes('Hello World!')

0 commit comments

Comments
 (0)