Skip to content

Commit 0ca047e

Browse files
lym953claude
andcommitted
feat(logs): detect durable function context from aws.lambda span tags
When an aws.lambda span (resource = dd-tracer-serverless-span) carries durable_function_execution_id and durable_function_execution_name meta tags, forward the context to the logs agent via a dedicated mpsc channel so held logs can be tagged and released. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 98d76af commit 0ca047e

5 files changed

Lines changed: 141 additions & 90 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use bottlecap::{
4646
},
4747
logger,
4848
logs::{
49-
agent::LogsAgent,
49+
agent::{DurableContextUpdate, LogsAgent},
5050
aggregator_service::{
5151
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
5252
},
@@ -291,14 +291,19 @@ async fn extension_loop_active(
291291
let account_id = r.account_id.as_ref().unwrap_or(&"none".to_string()).clone();
292292
let tags_provider = setup_tag_provider(&Arc::clone(&aws_config), config, &account_id);
293293

294-
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
295-
start_logs_agent(
296-
config,
297-
Arc::clone(&api_key_factory),
298-
&tags_provider,
299-
event_bus_tx.clone(),
300-
aws_config.is_managed_instance_mode(),
301-
);
294+
let (
295+
logs_agent_channel,
296+
logs_flusher,
297+
logs_agent_cancel_token,
298+
logs_aggregator_handle,
299+
durable_context_tx,
300+
) = start_logs_agent(
301+
config,
302+
Arc::clone(&api_key_factory),
303+
&tags_provider,
304+
event_bus_tx.clone(),
305+
aws_config.is_managed_instance_mode(),
306+
);
302307

303308
let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) =
304309
start_dogstatsd(tags_provider.clone(), Arc::clone(&api_key_factory), config).await;
@@ -344,6 +349,7 @@ async fn extension_loop_active(
344349
&tags_provider,
345350
invocation_processor_handle.clone(),
346351
appsec_processor.clone(),
352+
durable_context_tx,
347353
);
348354

349355
let api_runtime_proxy_shutdown_signal = start_api_runtime_proxy(
@@ -1024,14 +1030,15 @@ fn start_logs_agent(
10241030
LogsFlusher,
10251031
CancellationToken,
10261032
LogsAggregatorHandle,
1033+
Sender<DurableContextUpdate>,
10271034
) {
10281035
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
10291036
// Start service in background
10301037
tokio::spawn(async move {
10311038
aggregator_service.run().await;
10321039
});
10331040

1034-
let (mut agent, tx) = LogsAgent::new(
1041+
let (mut agent, tx, durable_context_tx) = LogsAgent::new(
10351042
Arc::clone(tags_provider),
10361043
Arc::clone(config),
10371044
event_bus,
@@ -1048,7 +1055,7 @@ fn start_logs_agent(
10481055
});
10491056

10501057
let flusher = LogsFlusher::new(api_key_factory, aggregator_handle.clone(), config.clone());
1051-
(tx, flusher, cancel_token, aggregator_handle)
1058+
(tx, flusher, cancel_token, aggregator_handle, durable_context_tx)
10521059
}
10531060

10541061
#[allow(clippy::type_complexity)]
@@ -1058,6 +1065,7 @@ fn start_trace_agent(
10581065
tags_provider: &Arc<TagProvider>,
10591066
invocation_processor_handle: InvocationProcessorHandle,
10601067
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
1068+
durable_context_tx: Sender<DurableContextUpdate>,
10611069
) -> (
10621070
Sender<SendDataBuilderInfo>,
10631071
Arc<trace_flusher::TraceFlusher>,
@@ -1130,6 +1138,7 @@ fn start_trace_agent(
11301138
Arc::clone(tags_provider),
11311139
stats_concentrator_handle.clone(),
11321140
span_dedup_handle,
1141+
durable_context_tx,
11331142
);
11341143
let trace_agent_channel = trace_agent.get_sender_copy();
11351144
let shutdown_token = trace_agent.shutdown_token();

bottlecap/src/logs/agent.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@ use crate::{LAMBDA_RUNTIME_SLUG, config};
1212

1313
const DRAIN_LOG_INTERVAL: Duration = Duration::from_millis(100);
1414

15+
/// `(request_id, execution_id, execution_name)` extracted from an `aws.lambda` span.
16+
pub type DurableContextUpdate = (String, String, String);
17+
1518
#[allow(clippy::module_name_repetitions)]
1619
pub struct LogsAgent {
1720
rx: mpsc::Receiver<TelemetryEvent>,
21+
durable_context_rx: mpsc::Receiver<DurableContextUpdate>,
1822
processor: LogsProcessor,
1923
aggregator_handle: AggregatorHandle,
2024
cancel_token: CancellationToken,
@@ -28,7 +32,7 @@ impl LogsAgent {
2832
event_bus: Sender<Event>,
2933
aggregator_handle: AggregatorHandle,
3034
is_managed_instance_mode: bool,
31-
) -> (Self, Sender<TelemetryEvent>) {
35+
) -> (Self, Sender<TelemetryEvent>, Sender<DurableContextUpdate>) {
3236
let processor = LogsProcessor::new(
3337
Arc::clone(&datadog_config),
3438
tags_provider,
@@ -38,16 +42,18 @@ impl LogsAgent {
3842
);
3943

4044
let (tx, rx) = mpsc::channel::<TelemetryEvent>(1000);
45+
let (durable_context_tx, durable_context_rx) = mpsc::channel::<DurableContextUpdate>(100);
4146
let cancel_token = CancellationToken::new();
4247

4348
let agent = Self {
4449
rx,
50+
durable_context_rx,
4551
processor,
4652
aggregator_handle,
4753
cancel_token,
4854
};
4955

50-
(agent, tx)
56+
(agent, tx, durable_context_tx)
5157
}
5258

5359
pub async fn spin(&mut self) {
@@ -56,6 +62,13 @@ impl LogsAgent {
5662
Some(event) = self.rx.recv() => {
5763
self.processor.process(event, &self.aggregator_handle).await;
5864
}
65+
Some((request_id, execution_id, execution_name)) = self.durable_context_rx.recv() => {
66+
self.processor.update_durable_map(&request_id, &execution_id, &execution_name);
67+
let ready = self.processor.take_ready_logs();
68+
if !ready.is_empty() {
69+
let _ = self.aggregator_handle.insert_batch(ready);
70+
}
71+
}
5972
() = self.cancel_token.cancelled() => {
6073
debug!("LOGS_AGENT | Received shutdown signal, draining remaining events");
6174

bottlecap/src/logs/lambda/processor.rs

Lines changed: 57 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ impl LambdaProcessor {
208208
let rv = runtime_version.unwrap_or("?".to_string()); // TODO: check what does containers display
209209
let rv_arn = runtime_version_arn.unwrap_or("?".to_string()); // TODO: check what do containers display
210210

211-
self.is_durable_function = Some(rv.contains("DurableFunction"));
211+
let is_durable = rv.contains("DurableFunction");
212+
self.is_durable_function = Some(is_durable);
212213
self.resolve_held_logs_on_durable_function_set();
213214

214215
Ok(Message::new(
@@ -493,37 +494,35 @@ impl LambdaProcessor {
493494
}
494495
}
495496

496-
/// Parses `message` as JSON and, if it contains both `durable_execution_id` and
497-
/// `durable_execution_name` fields, inserts or updates the entry for `request_id` in the
498-
/// durable ID map (evicting the oldest entry when the map is at capacity 5).
499-
///
500-
/// Returns `true` if a brand-new entry was added (the caller may then drain `held_logs`
501-
/// for that `request_id`).
502-
fn try_update_durable_map(&mut self, request_id: &str, message: &str) -> bool {
503-
let Ok(serde_json::Value::Object(obj)) = serde_json::from_str(message) else {
504-
return false;
505-
};
506-
let execution_id = obj
507-
.get("durable_execution_id")
508-
.and_then(serde_json::Value::as_str);
509-
let execution_name = obj
510-
.get("durable_execution_name")
511-
.and_then(serde_json::Value::as_str);
512-
if let (Some(id), Some(name)) = (execution_id, execution_name) {
513-
let is_new = !self.durable_id_map.contains_key(request_id);
514-
if is_new {
515-
if self.durable_id_order.len() >= DURABLE_ID_MAP_CAPACITY
516-
&& let Some(oldest) = self.durable_id_order.pop_front()
517-
{
518-
self.durable_id_map.remove(&oldest);
519-
}
520-
self.durable_id_order.push_back(request_id.to_string());
497+
/// Records the durable execution context for a `request_id`, received from the `aws.lambda`
498+
/// span. Evicts the oldest entry when the map is at capacity. If a new entry is added and
499+
/// `is_durable_function` is already `Some(true)`, drains any held logs for that `request_id`.
500+
pub fn update_durable_map(
501+
&mut self,
502+
request_id: &str,
503+
execution_id: &str,
504+
execution_name: &str,
505+
) {
506+
let is_new = !self.durable_id_map.contains_key(request_id);
507+
if is_new {
508+
if self.durable_id_order.len() >= DURABLE_ID_MAP_CAPACITY
509+
&& let Some(oldest) = self.durable_id_order.pop_front()
510+
{
511+
self.durable_id_map.remove(&oldest);
521512
}
522-
self.durable_id_map
523-
.insert(request_id.to_string(), (id.to_string(), name.to_string()));
524-
return is_new;
513+
self.durable_id_order.push_back(request_id.to_string());
514+
}
515+
self.durable_id_map.insert(
516+
request_id.to_string(),
517+
(execution_id.to_string(), execution_name.to_string()),
518+
);
519+
if is_new && self.is_durable_function == Some(true) {
520+
self.drain_held_for_request_id(request_id);
525521
}
526-
false
522+
}
523+
524+
pub fn take_ready_logs(&mut self) -> Vec<String> {
525+
std::mem::take(&mut self.ready_logs)
527526
}
528527

529528
/// Moves all logs held for `request_id` into `ready_logs`, tagging each with the
@@ -567,28 +566,23 @@ impl LambdaProcessor {
567566
}
568567
}
569568
Some(true) => {
569+
// Durable context is populated by span processing, not from log messages.
570+
// Flush any held logs whose request_id is already in the map; keep the rest.
570571
for (request_id, logs) in held {
571-
// Try to discover durable context from the held logs themselves.
572-
for log in &logs {
573-
self.try_update_durable_map(&request_id, &log.message.message);
574-
}
575572
let tags_suffix = self.durable_id_map.get(&request_id).map(|(id, name)| {
576573
format!(",durable_execution_id:{id},durable_execution_name:{name}")
577574
});
578575
// Borrow of durable_id_map released here.
579-
match tags_suffix {
580-
Some(suffix) => {
581-
for mut log in logs {
582-
log.tags.push_str(&suffix);
583-
if let Ok(s) = serde_json::to_string(&log) {
584-
self.ready_logs.push(s);
585-
}
576+
if let Some(suffix) = tags_suffix {
577+
for mut log in logs {
578+
log.tags.push_str(&suffix);
579+
if let Ok(s) = serde_json::to_string(&log) {
580+
self.ready_logs.push(s);
586581
}
587582
}
588-
None => {
589-
// No context yet — put back and wait for it to arrive.
590-
self.held_logs.insert(request_id, logs);
591-
}
583+
} else {
584+
// No context yet — wait for the aws.lambda span to arrive.
585+
self.held_logs.insert(request_id, logs);
592586
}
593587
}
594588
}
@@ -611,22 +605,18 @@ impl LambdaProcessor {
611605
/// - `None` → stash in `held_logs[request_id]`; logs without a `request_id` are
612606
/// flushed immediately since they cannot carry durable context.
613607
/// - `Some(false)` → serialize and push straight to `ready_logs`.
614-
/// - `Some(true)` → try to update `durable_id_map` from the log; if a new entry was
615-
/// added, drain `held_logs` for that `request_id`; then flush this log if
616-
/// its `request_id` is in the map, otherwise stash it in `held_logs`.
608+
/// - `Some(true)` → flush if this log's `request_id` is already in `durable_id_map`
609+
/// (context was populated by an `aws.lambda` span); otherwise stash in `held_logs`.
617610
fn queue_log_after_rules(&mut self, mut log: IntakeLog) {
618611
match self.is_durable_function {
619612
None => {
620-
match log.message.lambda.request_id.clone() {
621-
Some(rid) => {
622-
self.held_logs.entry(rid).or_default().push(log);
623-
}
624-
None => {
625-
// No request_id — cannot associate with durable context; flush now.
626-
if let Ok(s) = serde_json::to_string(&log) {
627-
drop(log);
628-
self.ready_logs.push(s);
629-
}
613+
if let Some(rid) = log.message.lambda.request_id.clone() {
614+
self.held_logs.entry(rid).or_default().push(log);
615+
} else {
616+
// No request_id — cannot associate with durable context; flush now.
617+
if let Ok(s) = serde_json::to_string(&log) {
618+
drop(log);
619+
self.ready_logs.push(s);
630620
}
631621
}
632622
}
@@ -639,14 +629,8 @@ impl LambdaProcessor {
639629
}
640630
}
641631
Some(true) => {
642-
if let Some(rid) = log.message.lambda.request_id.clone()
643-
&& self.try_update_durable_map(&rid, &log.message.message)
644-
{
645-
// New durable context just discovered — drain previously held logs.
646-
self.drain_held_for_request_id(&rid);
647-
}
648-
649-
// Flush this log if its request_id now has durable context; otherwise hold.
632+
// Durable context is populated by span processing (update_durable_map).
633+
// Flush this log if its request_id already has context; otherwise hold.
650634
let durable_tags = log
651635
.message
652636
.lambda
@@ -671,7 +655,7 @@ impl LambdaProcessor {
671655
if let Some(rid) = log.message.lambda.request_id.clone() {
672656
self.held_logs.entry(rid).or_default().push(log);
673657
}
674-
// Logs without a request_id cannot match the durable map; drop them.
658+
// No request_id in durable mode: drop the log.
675659
}
676660
}
677661
}
@@ -1932,7 +1916,7 @@ mod tests {
19321916
},
19331917
};
19341918
let start_msg = processor.get_message(start_event).await.unwrap();
1935-
processor.get_intake_log(start_msg).unwrap();
1919+
processor.get_intake_log(start_msg, false).unwrap();
19361920

19371921
// Test WARN level
19381922
let event = TelemetryEvent {
@@ -2017,7 +2001,7 @@ mod tests {
20172001
},
20182002
};
20192003
let start_msg = processor.get_message(start_event).await.unwrap();
2020-
processor.get_intake_log(start_msg).unwrap();
2004+
processor.get_intake_log(start_msg, false).unwrap();
20212005

20222006
// JSON without level field
20232007
let event = TelemetryEvent {
@@ -2058,7 +2042,7 @@ mod tests {
20582042
},
20592043
};
20602044
let start_msg = processor.get_message(start_event).await.unwrap();
2061-
processor.get_intake_log(start_msg).unwrap();
2045+
processor.get_intake_log(start_msg, false).unwrap();
20622046

20632047
// JSON with unrecognized level
20642048
let event = TelemetryEvent {
@@ -2189,7 +2173,7 @@ mod tests {
21892173
},
21902174
};
21912175
let start_msg = processor.get_message(start_event).await.unwrap();
2192-
processor.get_intake_log(start_msg).unwrap();
2176+
processor.get_intake_log(start_msg, false).unwrap();
21932177

21942178
// level as a number
21952179
let event = TelemetryEvent {
@@ -2252,7 +2236,7 @@ mod tests {
22522236
},
22532237
};
22542238
let start_msg = processor.get_message(start_event).await.unwrap();
2255-
processor.get_intake_log(start_msg).unwrap();
2239+
processor.get_intake_log(start_msg, false).unwrap();
22562240

22572241
// JSON log with both ddtags and level
22582242
let event = TelemetryEvent {
@@ -2301,7 +2285,7 @@ mod tests {
23012285
},
23022286
};
23032287
let start_msg = processor.get_message(start_event).await.unwrap();
2304-
processor.get_intake_log(start_msg).unwrap();
2288+
processor.get_intake_log(start_msg, false).unwrap();
23052289

23062290
// JSON log with "status" field instead of "level" (Datadog convention)
23072291
let event = TelemetryEvent {

bottlecap/src/logs/processor.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,25 @@ impl LogsProcessor {
4141
}
4242
}
4343
}
44+
45+
pub fn update_durable_map(
46+
&mut self,
47+
request_id: &str,
48+
execution_id: &str,
49+
execution_name: &str,
50+
) {
51+
match self {
52+
LogsProcessor::Lambda(p) => {
53+
p.update_durable_map(request_id, execution_id, execution_name);
54+
}
55+
}
56+
}
57+
58+
pub fn take_ready_logs(&mut self) -> Vec<String> {
59+
match self {
60+
LogsProcessor::Lambda(p) => p.take_ready_logs(),
61+
}
62+
}
4463
}
4564

4665
#[allow(clippy::module_name_repetitions)]

0 commit comments

Comments
 (0)