Skip to content

Commit 52624e9

Browse files
lym953claude
andcommitted
refactor: route durable context through lifecycle processor instead of directly to logs agent
Removes the direct trace_agent → logs_agent dependency by having the lifecycle processor (Processor) own durable_context_tx and forward durable execution context to the logs pipeline from add_tracer_span(). The trace agent now sends aws.lambda spans to the lifecycle processor (alongside INVOCATION_SPAN_RESOURCE spans) and the lifecycle processor handles the durable context extraction and forwarding. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b0bdec4 commit 52624e9

5 files changed

Lines changed: 38 additions & 33 deletions

File tree

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ async fn extension_loop_active(
330330
Arc::clone(&aws_config),
331331
metrics_aggregator_handle.clone(),
332332
Arc::clone(&propagator),
333+
durable_context_tx,
333334
);
334335
tokio::spawn(async move {
335336
invocation_processor_service.run().await;
@@ -362,7 +363,6 @@ async fn extension_loop_active(
362363
&tags_provider,
363364
invocation_processor_handle.clone(),
364365
appsec_processor.clone(),
365-
durable_context_tx,
366366
&shared_client,
367367
);
368368

@@ -1091,7 +1091,6 @@ fn start_trace_agent(
10911091
tags_provider: &Arc<TagProvider>,
10921092
invocation_processor_handle: InvocationProcessorHandle,
10931093
appsec_processor: Option<Arc<TokioMutex<AppSecProcessor>>>,
1094-
durable_context_tx: Sender<DurableContextUpdate>,
10951094
client: &Client,
10961095
) -> (
10971096
Sender<SendDataBuilderInfo>,
@@ -1177,7 +1176,6 @@ fn start_trace_agent(
11771176
Arc::clone(tags_provider),
11781177
stats_concentrator_handle.clone(),
11791178
span_dedup_handle,
1180-
durable_context_tx,
11811179
);
11821180
let trace_agent_channel = trace_agent.get_sender_copy();
11831181
let shutdown_token = trace_agent.shutdown_token();

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use serde_json::Value;
1111
use tokio::time::Instant;
1212
use tracing::{debug, trace, warn};
1313

14+
use tokio::sync::mpsc;
15+
1416
use crate::{
1517
config::{self, aws::AwsConfig},
1618
extension::telemetry::events::{
@@ -24,6 +26,7 @@ use crate::{
2426
span_inferrer::{self, SpanInferrer},
2527
triggers::get_default_service_name,
2628
},
29+
logs::agent::DurableContextUpdate,
2730
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
2831
proc::{
2932
self, CPUData, NetworkData,
@@ -88,6 +91,10 @@ pub struct Processor {
8891
/// Tracks whether if first invocation after init has been received in Managed Instance mode.
8992
/// Used to determine if we should search for the empty context on an invocation.
9093
awaiting_first_invocation: bool,
94+
/// Sender used to forward durable execution context extracted from `aws.lambda` spans to the
95+
/// logs pipeline. Decouples the trace agent from the logs agent: the trace agent sends spans
96+
/// to the lifecycle processor, which extracts durable context and relays it here.
97+
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
9198
}
9299

93100
impl Processor {
@@ -98,6 +105,7 @@ impl Processor {
98105
aws_config: Arc<AwsConfig>,
99106
metrics_aggregator: dogstatsd::aggregator::AggregatorHandle,
100107
propagator: Arc<DatadogCompositePropagator>,
108+
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
101109
) -> Self {
102110
let resource = tags_provider
103111
.get_canonical_resource_name()
@@ -127,6 +135,7 @@ impl Processor {
127135
dynamic_tags: HashMap::new(),
128136
active_invocations: 0,
129137
awaiting_first_invocation: false,
138+
durable_context_tx,
130139
}
131140
}
132141

@@ -1337,10 +1346,27 @@ impl Processor {
13371346
///
13381347
/// This is used to enrich the invocation span with additional metadata from the tracers
13391348
/// top level span, since we discard the tracer span when we create the invocation span.
1349+
///
1350+
/// Also forwards durable execution context to the logs pipeline when the span carries
1351+
/// `durable_function_execution_id` and `durable_function_execution_name` metadata.
13401352
pub fn add_tracer_span(&mut self, span: &Span) {
13411353
if let Some(request_id) = span.meta.get("request_id") {
13421354
self.context_buffer.add_tracer_span(request_id, span);
13431355
}
1356+
1357+
if let (Some(request_id), Some(exec_id), Some(exec_name)) = (
1358+
span.meta.get("request_id"),
1359+
span.meta.get("durable_function_execution_id"),
1360+
span.meta.get("durable_function_execution_name"),
1361+
) {
1362+
if let Err(e) = self.durable_context_tx.try_send((
1363+
request_id.clone(),
1364+
exec_id.clone(),
1365+
exec_name.clone(),
1366+
)) {
1367+
warn!("LIFECYCLE | Failed to forward durable context to logs pipeline: {e}");
1368+
}
1369+
}
13441370
}
13451371
}
13461372

@@ -1387,7 +1413,8 @@ mod tests {
13871413
tokio::spawn(service.run());
13881414

13891415
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
1390-
Processor::new(tags_provider, config, aws_config, handle, propagator)
1416+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
1417+
Processor::new(tags_provider, config, aws_config, handle, propagator, durable_context_tx)
13911418
}
13921419

13931420
#[test]
@@ -1924,7 +1951,8 @@ mod tests {
19241951

19251952
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
19261953

1927-
let processor = Processor::new(tags_provider, config, aws_config, handle, propagator);
1954+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
1955+
let processor = Processor::new(tags_provider, config, aws_config, handle, propagator, durable_context_tx);
19281956

19291957
assert!(
19301958
processor.is_managed_instance_mode(),

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::{
1717
context::{Context, ReparentingInfo},
1818
processor::Processor,
1919
},
20+
logs::agent::DurableContextUpdate,
2021
tags::provider,
2122
traces::{
2223
context::SpanContext, propagation::DatadogCompositePropagator,
@@ -430,6 +431,7 @@ impl InvocationProcessorService {
430431
aws_config: Arc<AwsConfig>,
431432
metrics_aggregator_handle: AggregatorHandle,
432433
propagator: Arc<DatadogCompositePropagator>,
434+
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
433435
) -> (InvocationProcessorHandle, Self) {
434436
let (sender, receiver) = mpsc::channel(1000);
435437

@@ -439,6 +441,7 @@ impl InvocationProcessorService {
439441
aws_config,
440442
metrics_aggregator_handle,
441443
propagator,
444+
durable_context_tx,
442445
);
443446

444447
let handle = InvocationProcessorHandle { sender };

bottlecap/src/proxy/interceptor.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,13 +499,16 @@ mod tests {
499499
initialization_type: "on-demand".into(),
500500
});
501501
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
502+
let (durable_context_tx, _durable_context_rx) =
503+
tokio::sync::mpsc::channel(1);
502504
let (invocation_processor_handle, invocation_processor_service) =
503505
InvocationProcessorService::new(
504506
Arc::clone(&tags_provider),
505507
Arc::clone(&config),
506508
Arc::clone(&aws_config),
507509
metrics_aggregator,
508510
Arc::clone(&propagator),
511+
durable_context_tx,
509512
);
510513
tokio::spawn(async move {
511514
invocation_processor_service.run().await;

bottlecap/src/traces/trace_agent.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use tokio_util::sync::CancellationToken;
2121
use tower_http::limit::RequestBodyLimitLayer;
2222
use tracing::{debug, error, warn};
2323

24-
use crate::logs::agent::DurableContextUpdate;
2524
use crate::traces::trace_processor::SendingTraceProcessor;
2625
use crate::{
2726
appsec::processor::Processor as AppSecProcessor,
@@ -92,7 +91,6 @@ pub struct TraceState {
9291
pub invocation_processor_handle: InvocationProcessorHandle,
9392
pub tags_provider: Arc<provider::Provider>,
9493
pub span_deduper: DedupHandle,
95-
pub durable_context_tx: Sender<DurableContextUpdate>,
9694
}
9795

9896
#[derive(Clone)]
@@ -120,7 +118,6 @@ pub struct TraceAgent {
120118
tx: Sender<SendDataBuilderInfo>,
121119
stats_concentrator: StatsConcentratorHandle,
122120
span_deduper: DedupHandle,
123-
durable_context_tx: Sender<DurableContextUpdate>,
124121
}
125122

126123
#[derive(Clone, Copy)]
@@ -144,7 +141,6 @@ impl TraceAgent {
144141
tags_provider: Arc<provider::Provider>,
145142
stats_concentrator: StatsConcentratorHandle,
146143
span_deduper: DedupHandle,
147-
durable_context_tx: Sender<DurableContextUpdate>,
148144
) -> TraceAgent {
149145
// Set up a channel to send processed traces to our trace aggregator. tx is passed through each
150146
// endpoint_handler to the trace processor, which uses it to send de-serialized
@@ -174,7 +170,6 @@ impl TraceAgent {
174170
shutdown_token: CancellationToken::new(),
175171
stats_concentrator,
176172
span_deduper,
177-
durable_context_tx,
178173
}
179174
}
180175

@@ -230,7 +225,6 @@ impl TraceAgent {
230225
invocation_processor_handle: self.invocation_processor_handle.clone(),
231226
tags_provider: Arc::clone(&self.tags_provider),
232227
span_deduper: self.span_deduper.clone(),
233-
durable_context_tx: self.durable_context_tx.clone(),
234228
};
235229

236230
let stats_state = StatsState {
@@ -310,7 +304,6 @@ impl TraceAgent {
310304
state.invocation_processor_handle,
311305
state.tags_provider,
312306
state.span_deduper,
313-
state.durable_context_tx,
314307
ApiVersion::V04,
315308
)
316309
.await
@@ -324,7 +317,6 @@ impl TraceAgent {
324317
state.invocation_processor_handle,
325318
state.tags_provider,
326319
state.span_deduper,
327-
state.durable_context_tx,
328320
ApiVersion::V05,
329321
)
330322
.await
@@ -481,7 +473,6 @@ impl TraceAgent {
481473
invocation_processor_handle: InvocationProcessorHandle,
482474
tags_provider: Arc<provider::Provider>,
483475
deduper: DedupHandle,
484-
durable_context_tx: Sender<DurableContextUpdate>,
485476
version: ApiVersion,
486477
) -> Response {
487478
let start = Instant::now();
@@ -593,7 +584,7 @@ impl TraceAgent {
593584
}
594585
}
595586

596-
if span.resource == INVOCATION_SPAN_RESOURCE
587+
if (span.resource == INVOCATION_SPAN_RESOURCE || span.name == "aws.lambda")
597588
&& let Err(e) = invocation_processor_handle
598589
.add_tracer_span(span.clone())
599590
.await
@@ -602,24 +593,6 @@ impl TraceAgent {
602593
}
603594
handle_reparenting(&mut reparenting_info, &mut span);
604595

605-
if span.name == "aws.lambda" {
606-
// If this aws.lambda span carries durable function context, forward it to
607-
// the logs agent so it can release any held logs and set durable execution context on them.
608-
if let (Some(request_id), Some(execution_id), Some(execution_name)) = (
609-
span.meta.get("request_id"),
610-
span.meta.get("durable_function_execution_id"),
611-
span.meta.get("durable_function_execution_name"),
612-
) {
613-
let _ = durable_context_tx
614-
.send((
615-
request_id.clone(),
616-
execution_id.clone(),
617-
execution_name.clone(),
618-
))
619-
.await;
620-
}
621-
}
622-
623596
// Keep the span
624597
chunk.push(span);
625598
}

0 commit comments

Comments
 (0)