Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 138 additions & 1 deletion bottlecap/src/logs/lambda/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,44 @@ impl LambdaProcessor {
async fn get_message(&mut self, event: TelemetryEvent) -> Result<Message, Box<dyn Error>> {
let copy = event.clone();
match event.record {
TelemetryRecord::Function(v) | TelemetryRecord::Extension(v) => {
TelemetryRecord::Function(v) => {
let (request_id, message) = match v {
serde_json::Value::Object(obj) => {
let request_id = if self.is_managed_instance_mode {
obj.get("requestId")
.or_else(|| obj.get("AWSRequestId"))
.and_then(|v| v.as_str())
.map(ToString::to_string)
} else {
None
};
let msg = Some(serde_json::to_string(&obj).unwrap_or_default());
(request_id, msg)
},
serde_json::Value::String(s) => (None, Some(s)),
_ => (None, None),
};

if let Some(message) = message {
if is_oom_error(&message) {
debug!("LOGS | Got a runtime-specific OOM error. Incrementing OOM metric.");
if let Err(e) = self.event_bus.send(Event::OutOfMemory(event.time.timestamp())).await {
error!("LOGS | Failed to send OOM event to the main event bus: {e}");
}
}
Comment on lines +111 to +116
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can delete the if is_oom_error(&message) {} block in the TelemetryRecord::Extension case below. I think runtime-specific OOM errors can only come from the function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to validate that? I would have though that extensions can also have an OOM error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will leave for now to be safe.


return Ok(Message::new(
message,
request_id,
self.function_arn.clone(),
event.time.timestamp_millis(),
None,
));
}

Err("Unable to parse log".into())
}
TelemetryRecord::Extension(v) => {
let message = match v {
serde_json::Value::Object(obj) => Some(serde_json::to_string(&obj).unwrap_or_default()),
serde_json::Value::String(s) => Some(s),
Expand Down Expand Up @@ -1495,4 +1532,104 @@ mod tests {
assert_eq!(log2.message.lambda.request_id, None);
assert_eq!(processor.orphan_logs.len(), 0);
}

#[tokio::test]
async fn test_lmi_extracts_request_id_from_function_json() {
let tags = HashMap::from([("test".to_string(), "tags".to_string())]);
let config = Arc::new(config::Config {
service: Some("test-service".to_string()),
tags: tags.clone(),
..config::Config::default()
});

let tags_provider = Arc::new(provider::Provider::new(
Arc::clone(&config),
LAMBDA_RUNTIME_SLUG.to_string(),
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
));

let (tx, _) = tokio::sync::mpsc::channel(2);

let mut processor = LambdaProcessor::new(
tags_provider,
Arc::new(config::Config {
service: Some("test-service".to_string()),
tags,
..config::Config::default()
}),
tx.clone(),
true, // LMI mode
);

// Test with "requestId" field
let mut obj = serde_json::Map::new();
obj.insert(
"requestId".to_string(),
Value::String("test-request-123".to_string()),
);
obj.insert(
"message".to_string(),
Value::String("Hello World".to_string()),
);

let event = TelemetryEvent {
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(),
record: TelemetryRecord::Function(Value::Object(obj)),
};

let result = processor.get_message(event).await.unwrap();
assert_eq!(
result.lambda.request_id,
Some("test-request-123".to_string())
);
}

#[tokio::test]
async fn test_regular_lambda_does_not_extract_request_id() {
let tags = HashMap::from([("test".to_string(), "tags".to_string())]);
let config = Arc::new(config::Config {
service: Some("test-service".to_string()),
tags: tags.clone(),
..config::Config::default()
});

let tags_provider = Arc::new(provider::Provider::new(
Arc::clone(&config),
LAMBDA_RUNTIME_SLUG.to_string(),
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
));

let (tx, _) = tokio::sync::mpsc::channel(2);

let mut processor = LambdaProcessor::new(
tags_provider,
Arc::new(config::Config {
service: Some("test-service".to_string()),
tags,
..config::Config::default()
}),
tx.clone(),
false, // Regular Lambda mode (not LMI)
);

// Test that requestId is NOT extracted in regular Lambda mode
let mut obj = serde_json::Map::new();
obj.insert(
"requestId".to_string(),
Value::String("test-request-789".to_string()),
);
obj.insert(
"message".to_string(),
Value::String("Hello World".to_string()),
);

let event = TelemetryEvent {
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(),
record: TelemetryRecord::Function(Value::Object(obj)),
};

let result = processor.get_message(event).await.unwrap();
// Should be None because we're not in LMI mode
assert_eq!(result.lambda.request_id, None);
}
}
3 changes: 1 addition & 2 deletions integration-tests/tests/lmi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ describe('LMI Integration Tests', () => {
expect(results[runtime].logs!.length).toBeGreaterThan(0);
});

// SVLS-8231
test.failing('should have "Hello World!" log message', () => {
it('should have "Hello World!" log message', () => {
expect(results[runtime].logs).toBeDefined();
const helloWorldLog = results[runtime].logs!.find((log: any) =>
log.message && log.message.includes('Hello World!')
Expand Down
Loading