Skip to content

Commit 90c56d3

Browse files
Merge branch 'main' into shreya.malpani/fix-e2e-test-trigger
2 parents 91de7e6 + b86a291 commit 90c56d3

10 files changed

Lines changed: 298 additions & 10 deletions

File tree

bottlecap/src/appsec/processor/mod.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,18 @@ impl Processor {
151151
/// Returns the first `aws.lambda` span from the provided trace, if one
152152
/// exists.
153153
///
154+
/// Placeholder spans (resource == `INVOCATION_SPAN_RESOURCE`) emitted by
155+
/// Go and Java tracers are excluded: they are always dropped by the chunk
156+
/// processor before reaching the backend, so tagging them would waste the
157+
/// `AppSec` context and trigger a premature context deletion that would leave
158+
/// the real, extension-built `aws.lambda` span untagged.
159+
///
154160
/// # Returns
155161
/// The span on which security information will be attached.
156162
pub fn service_entry_span_mut(trace: &mut [Span]) -> Option<&mut Span> {
157-
trace.iter_mut().find(|span| span.name == "aws.lambda")
163+
trace.iter_mut().find(|span| {
164+
span.name == "aws.lambda" && span.resource != crate::traces::INVOCATION_SPAN_RESOURCE
165+
})
158166
}
159167

160168
/// Processes an intercepted [`Span`].
@@ -812,4 +820,41 @@ mod tests {
812820
result
813821
);
814822
}
823+
824+
#[test]
825+
fn service_entry_span_mut_skips_placeholder_lambda_span() {
826+
let mut trace = vec![
827+
Span {
828+
name: "aws.lambda".into(),
829+
resource: crate::traces::INVOCATION_SPAN_RESOURCE.into(),
830+
span_id: 1,
831+
..Default::default()
832+
},
833+
Span {
834+
name: "aws.lambda".into(),
835+
resource: "real.lambda.invocation".into(),
836+
span_id: 2,
837+
..Default::default()
838+
},
839+
];
840+
841+
let selected = Processor::service_entry_span_mut(&mut trace)
842+
.expect("expected non-placeholder aws.lambda span");
843+
844+
assert_eq!(selected.name, "aws.lambda");
845+
assert_ne!(selected.resource, crate::traces::INVOCATION_SPAN_RESOURCE);
846+
assert_eq!(selected.span_id, 2);
847+
}
848+
849+
#[test]
850+
fn service_entry_span_mut_returns_none_for_only_placeholder() {
851+
let mut trace = vec![Span {
852+
name: "aws.lambda".into(),
853+
resource: crate::traces::INVOCATION_SPAN_RESOURCE.into(),
854+
span_id: 1,
855+
..Default::default()
856+
}];
857+
858+
assert!(Processor::service_entry_span_mut(&mut trace).is_none());
859+
}
815860
}

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,6 @@ impl Processor {
615615
);
616616
}
617617

618-
// todo(duncanista): Add missing metric tags for ASM
619618
// Add dynamic and trigger tags
620619
context
621620
.invocation_span
@@ -626,6 +625,19 @@ impl Processor {
626625
context.invocation_span.meta.extend(trigger_tags);
627626
}
628627

628+
// Ensure _dd.appsec.enabled is present on the invocation span when AAP is enabled.
629+
// complete_inferred_spans (called below) propagates this metric from the invocation
630+
// span to the inferred trigger span. AppSec's process_span will set it again from the
631+
// security context when it runs, but this baseline guarantees the tag is always present
632+
// even when the context cannot be found at flush time.
633+
if self.config.serverless_appsec_enabled {
634+
context
635+
.invocation_span
636+
.metrics
637+
.entry("_dd.appsec.enabled".to_string())
638+
.or_insert(1.0);
639+
}
640+
629641
self.inferrer
630642
.complete_inferred_spans(&context.invocation_span);
631643

@@ -1405,6 +1417,7 @@ impl Processor {
14051417
execution_id: &str,
14061418
execution_name: &str,
14071419
first_invocation: Option<bool>,
1420+
execution_status: Option<String>,
14081421
) {
14091422
if let Err(e) = self
14101423
.durable_context_tx
@@ -1413,6 +1426,7 @@ impl Processor {
14131426
execution_id: execution_id.to_owned(),
14141427
execution_name: execution_name.to_owned(),
14151428
first_invocation,
1429+
execution_status,
14161430
})
14171431
.await
14181432
{
@@ -2332,4 +2346,103 @@ mod tests {
23322346
"no contexts should be ready to send yet"
23332347
);
23342348
}
2349+
2350+
fn setup_appsec() -> Processor {
2351+
let aws_config = Arc::new(AwsConfig {
2352+
region: "us-east-1".into(),
2353+
aws_lwa_proxy_lambda_runtime_api: Some("***".into()),
2354+
function_name: "test-function".into(),
2355+
sandbox_init_time: Instant::now(),
2356+
runtime_api: "***".into(),
2357+
exec_wrapper: None,
2358+
initialization_type: "on-demand".into(),
2359+
});
2360+
let config = Arc::new(config::Config {
2361+
service: Some("test-service".to_string()),
2362+
serverless_appsec_enabled: true,
2363+
..config::Config::default()
2364+
});
2365+
let tags_provider = Arc::new(provider::Provider::new(
2366+
Arc::clone(&config),
2367+
LAMBDA_RUNTIME_SLUG.to_string(),
2368+
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
2369+
));
2370+
let (service, handle) =
2371+
dogstatsd::aggregator::AggregatorService::new(dogstatsd::metric::EMPTY_TAGS, 1024)
2372+
.expect("failed to create aggregator service");
2373+
tokio::spawn(service.run());
2374+
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
2375+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
2376+
Processor::new(
2377+
tags_provider,
2378+
config,
2379+
aws_config,
2380+
handle,
2381+
propagator,
2382+
durable_context_tx,
2383+
)
2384+
}
2385+
2386+
#[tokio::test]
2387+
async fn enrich_ctx_sets_appsec_enabled_when_aap_enabled() {
2388+
let mut p = setup_appsec();
2389+
let request_id = String::from("req-appsec");
2390+
p.on_invoke_event(request_id.clone());
2391+
p.on_platform_start(request_id.clone(), chrono::Utc::now());
2392+
2393+
let ctx = p
2394+
.enrich_ctx_at_platform_done(&request_id, Status::Success)
2395+
.expect("context must be present");
2396+
2397+
assert_eq!(
2398+
ctx.invocation_span.metrics.get("_dd.appsec.enabled"),
2399+
Some(&1.0),
2400+
"_dd.appsec.enabled must be 1.0 when AAP is enabled"
2401+
);
2402+
}
2403+
2404+
#[tokio::test]
2405+
async fn enrich_ctx_does_not_set_appsec_enabled_when_aap_disabled() {
2406+
let mut p = setup();
2407+
let request_id = String::from("req-no-appsec");
2408+
p.on_invoke_event(request_id.clone());
2409+
p.on_platform_start(request_id.clone(), chrono::Utc::now());
2410+
2411+
let ctx = p
2412+
.enrich_ctx_at_platform_done(&request_id, Status::Success)
2413+
.expect("context must be present");
2414+
2415+
assert!(
2416+
!ctx.invocation_span
2417+
.metrics
2418+
.contains_key("_dd.appsec.enabled"),
2419+
"_dd.appsec.enabled must not be set when AAP is disabled"
2420+
);
2421+
}
2422+
2423+
#[tokio::test]
2424+
async fn enrich_ctx_does_not_override_existing_appsec_enabled() {
2425+
let mut p = setup_appsec();
2426+
let request_id = String::from("req-appsec-preset");
2427+
p.on_invoke_event(request_id.clone());
2428+
p.on_platform_start(request_id.clone(), chrono::Utc::now());
2429+
2430+
// Pre-set a different value to verify or_insert does not overwrite it.
2431+
p.context_buffer
2432+
.get_mut(&request_id)
2433+
.expect("context must exist")
2434+
.invocation_span
2435+
.metrics
2436+
.insert("_dd.appsec.enabled".to_string(), 0.0);
2437+
2438+
let ctx = p
2439+
.enrich_ctx_at_platform_done(&request_id, Status::Success)
2440+
.expect("context must be present");
2441+
2442+
assert_eq!(
2443+
ctx.invocation_span.metrics.get("_dd.appsec.enabled"),
2444+
Some(&0.0),
2445+
"pre-existing _dd.appsec.enabled value must not be overwritten"
2446+
);
2447+
}
23352448
}

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ pub enum ProcessorCommand {
115115
execution_id: String,
116116
execution_name: String,
117117
first_invocation: Option<bool>,
118+
execution_status: Option<String>,
118119
},
119120
OnOutOfMemoryError {
120121
timestamp: i64,
@@ -391,13 +392,15 @@ impl InvocationProcessorHandle {
391392
execution_id: String,
392393
execution_name: String,
393394
first_invocation: Option<bool>,
395+
execution_status: Option<String>,
394396
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
395397
self.sender
396398
.send(ProcessorCommand::ForwardDurableContext {
397399
request_id,
398400
execution_id,
399401
execution_name,
400402
first_invocation,
403+
execution_status,
401404
})
402405
.await
403406
}
@@ -617,13 +620,15 @@ impl InvocationProcessorService {
617620
execution_id,
618621
execution_name,
619622
first_invocation,
623+
execution_status,
620624
} => {
621625
self.processor
622626
.forward_durable_context(
623627
&request_id,
624628
&execution_id,
625629
&execution_name,
626630
first_invocation,
631+
execution_status,
627632
)
628633
.await;
629634
}

bottlecap/src/logs/lambda/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub struct DurableContextUpdate {
99
pub execution_id: String,
1010
pub execution_name: String,
1111
pub first_invocation: Option<bool>,
12+
pub execution_status: Option<String>,
1213
}
1314

1415
/// Durable execution context stored per `request_id` in `LambdaProcessor::durable_context_map`.
@@ -17,6 +18,7 @@ pub struct DurableExecutionContext {
1718
pub execution_id: String,
1819
pub execution_name: String,
1920
pub first_invocation: Option<bool>,
21+
pub execution_status: Option<String>,
2022
}
2123

2224
///
@@ -66,6 +68,11 @@ pub struct Lambda {
6668
skip_serializing_if = "Option::is_none"
6769
)]
6870
pub first_invocation: Option<bool>,
71+
#[serde(
72+
rename = "durable_function.execution_status",
73+
skip_serializing_if = "Option::is_none"
74+
)]
75+
pub durable_execution_status: Option<String>,
6976
}
7077

7178
impl Message {

bottlecap/src/logs/lambda/processor.rs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,10 +546,11 @@ impl LambdaProcessor {
546546
/// `request_id`.
547547
pub fn insert_to_durable_context_map(
548548
&mut self,
549-
request_id: &str, // key
550-
execution_id: &str, // value
551-
execution_name: &str, // value
552-
first_invocation: Option<bool>, // value
549+
request_id: &str, // key
550+
execution_id: &str, // value
551+
execution_name: &str, // value
552+
first_invocation: Option<bool>, // value
553+
execution_status: Option<String>, // value
553554
) {
554555
if self.durable_context_map.contains_key(request_id) {
555556
error!("LOGS | insert_to_durable_context_map: request_id={request_id} already in map");
@@ -567,6 +568,7 @@ impl LambdaProcessor {
567568
execution_id: execution_id.to_string(),
568569
execution_name: execution_name.to_string(),
569570
first_invocation,
571+
execution_status,
570572
},
571573
);
572574
self.drain_held_logs_for_request_id(request_id);
@@ -660,6 +662,12 @@ impl LambdaProcessor {
660662
if is_platform_log(&log.message.message) {
661663
log.message.lambda.first_invocation = ctx.first_invocation;
662664
}
665+
if log.message.message.starts_with("END RequestId:") {
666+
log.message
667+
.lambda
668+
.durable_execution_status
669+
.clone_from(&ctx.execution_status);
670+
}
663671
if let Ok(s) = serde_json::to_string(&log) {
664672
// explicitly drop log so we don't accidentally re-use it and push
665673
// duplicate logs to the aggregator
@@ -2578,4 +2586,41 @@ mod tests {
25782586
let batches = aggregator_handle.get_batches().await.unwrap();
25792587
assert!(batches.is_empty());
25802588
}
2589+
2590+
#[tokio::test]
2591+
async fn test_execution_status_on_end_log() {
2592+
for (execution_status, expected) in [
2593+
(Some("SUCCEEDED"), serde_json::json!("SUCCEEDED")),
2594+
(None, serde_json::Value::Null),
2595+
] {
2596+
let mut processor = make_processor_for_durable_arn_tests();
2597+
processor.is_durable_function = Some(true);
2598+
processor.invocation_context.request_id = "req-end".to_string();
2599+
processor.insert_to_durable_context_map(
2600+
"req-end",
2601+
"exec-id-123",
2602+
"exec-name-abc",
2603+
Some(false),
2604+
execution_status.map(str::to_string),
2605+
);
2606+
let event = TelemetryEvent {
2607+
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(),
2608+
record: TelemetryRecord::PlatformRuntimeDone {
2609+
request_id: "req-end".to_string(),
2610+
status: Status::Success,
2611+
error_type: None,
2612+
metrics: None,
2613+
},
2614+
};
2615+
let (aggregator_service, aggregator_handle) = AggregatorService::default();
2616+
tokio::spawn(async move { aggregator_service.run().await });
2617+
processor.process(event, &aggregator_handle).await;
2618+
let batches = aggregator_handle.get_batches().await.unwrap();
2619+
let logs: Vec<serde_json::Value> = serde_json::from_slice(&batches[0]).unwrap();
2620+
assert_eq!(
2621+
logs[0]["message"]["lambda"]["durable_function.execution_status"],
2622+
expected
2623+
);
2624+
}
2625+
}
25812626
}

bottlecap/src/logs/processor.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ impl LogsProcessor {
5353
&update.execution_id,
5454
&update.execution_name,
5555
update.first_invocation,
56+
update.execution_status,
5657
);
5758
let ready_logs = self.take_ready_logs();
5859
if !ready_logs.is_empty()
@@ -68,6 +69,7 @@ impl LogsProcessor {
6869
execution_id: &str,
6970
execution_name: &str,
7071
first_invocation: Option<bool>,
72+
execution_status: Option<String>,
7173
) {
7274
match self {
7375
LogsProcessor::Lambda(p) => {
@@ -76,6 +78,7 @@ impl LogsProcessor {
7678
execution_id,
7779
execution_name,
7880
first_invocation,
81+
execution_status,
7982
);
8083
}
8184
}

0 commit comments

Comments
 (0)