@@ -49,6 +49,11 @@ use crate::{
4949pub const MS_TO_NS : f64 = 1_000_000.0 ;
5050pub const S_TO_MS : u64 = 1_000 ;
5151pub const S_TO_NS : f64 = 1_000_000_000.0 ;
52+ /// Threshold for classifying a Lambda cold start as proactive initialization.
53+ ///
54+ /// Proactive initialization is a Lambda optimization where the runtime pre-initializes
55+ /// a sandbox before any invocation is scheduled, to reduce cold start latency for future
56+ /// requests.
5257pub const PROACTIVE_INITIALIZATION_THRESHOLD_MS : u64 = 10_000 ;
5358
5459pub const DATADOG_INVOCATION_ERROR_MESSAGE_KEY : & str = "x-datadog-invocation-error-msg" ;
@@ -96,6 +101,8 @@ pub struct Processor {
96101 /// logs agent. Decouples the trace agent from the logs agent: the trace agent sends spans
97102 /// to the lifecycle processor, which extracts durable context and relays it here.
98103 durable_context_tx : mpsc:: Sender < DurableContextUpdate > ,
104+ /// Time of the `SnapStart` restore event, set when `PlatformRestoreStart` is received.
105+ restore_time : Option < Instant > ,
99106}
100107
101108impl Processor {
@@ -137,6 +144,7 @@ impl Processor {
137144 active_invocations : 0 ,
138145 awaiting_first_invocation : false ,
139146 durable_context_tx,
147+ restore_time : None ,
140148 }
141149 }
142150
@@ -252,12 +260,35 @@ impl Processor {
252260
253261 // If it's empty, then we are in a cold start
254262 if self . context_buffer . is_empty ( ) {
255- let now = Instant :: now ( ) ;
256- let time_since_sandbox_init = now. duration_since ( self . aws_config . sandbox_init_time ) ;
257- if time_since_sandbox_init. as_millis ( ) > PROACTIVE_INITIALIZATION_THRESHOLD_MS . into ( ) {
258- proactive_initialization = true ;
263+ if self . aws_config . is_snapstart ( ) {
264+ match self . restore_time {
265+ None => {
266+ // PlatformRestoreStart hasn't arrived yet — restore and invoke
267+ // happened close together, so this is a cold start (not proactive).
268+ cold_start = true ;
269+ }
270+ Some ( restore_time) => {
271+ let now = Instant :: now ( ) ;
272+ let time_since_restore = now. duration_since ( restore_time) ;
273+ if time_since_restore. as_millis ( )
274+ > PROACTIVE_INITIALIZATION_THRESHOLD_MS . into ( )
275+ {
276+ proactive_initialization = true ;
277+ } else {
278+ cold_start = true ;
279+ }
280+ }
281+ }
259282 } else {
260- cold_start = true ;
283+ let now = Instant :: now ( ) ;
284+ let time_since_sandbox_init = now. duration_since ( self . aws_config . sandbox_init_time ) ;
285+ if time_since_sandbox_init. as_millis ( )
286+ > PROACTIVE_INITIALIZATION_THRESHOLD_MS . into ( )
287+ {
288+ proactive_initialization = true ;
289+ } else {
290+ cold_start = true ;
291+ }
261292 }
262293
263294 // Resolve runtime only once
@@ -383,6 +414,8 @@ impl Processor {
383414 /// This is used to create a `snapstart_restore` span, since this telemetry event does not
384415 /// provide a `request_id`, we try to guess which invocation is the restore similar to init.
385416 pub fn on_platform_restore_start ( & mut self , time : DateTime < Utc > ) {
417+ self . restore_time = Some ( Instant :: now ( ) ) ;
418+
386419 let start_time: i64 = SystemTime :: from ( time)
387420 . duration_since ( UNIX_EPOCH )
388421 . expect ( "time went backwards" )
@@ -620,16 +653,9 @@ impl Processor {
620653 trace_sender : & Arc < SendingTraceProcessor > ,
621654 context : Context ,
622655 ) {
623- let client_computed_stats = context. client_computed_stats ;
624656 let ( traces, body_size) = self . get_ctx_spans ( context) ;
625- self . send_spans (
626- traces,
627- body_size,
628- client_computed_stats,
629- tags_provider,
630- trace_sender,
631- )
632- . await ;
657+ self . send_spans ( traces, body_size, tags_provider, trace_sender)
658+ . await ;
633659 }
634660
635661 fn get_ctx_spans ( & mut self , context : Context ) -> ( Vec < Span > , usize ) {
@@ -694,7 +720,7 @@ impl Processor {
694720 let traces = vec ! [ cold_start_span. clone( ) ] ;
695721 let body_size = size_of_val ( cold_start_span) ;
696722
697- self . send_spans ( traces, body_size, false , tags_provider, trace_sender)
723+ self . send_spans ( traces, body_size, tags_provider, trace_sender)
698724 . await ;
699725 }
700726 }
@@ -706,7 +732,6 @@ impl Processor {
706732 & mut self ,
707733 traces : Vec < Span > ,
708734 body_size : usize ,
709- client_computed_stats : bool ,
710735 tags_provider : & Arc < provider:: Provider > ,
711736 trace_sender : & Arc < SendingTraceProcessor > ,
712737 ) {
@@ -719,7 +744,7 @@ impl Processor {
719744 tracer_version : "" ,
720745 container_id : "" ,
721746 client_computed_top_level : false ,
722- client_computed_stats,
747+ client_computed_stats : false ,
723748 dropped_p0_traces : 0 ,
724749 dropped_p0_spans : 0 ,
725750 } ;
@@ -1366,10 +1391,9 @@ impl Processor {
13661391 ///
13671392 /// This is used to enrich the invocation span with additional metadata from the tracers
13681393 /// top level span, since we discard the tracer span when we create the invocation span.
1369- pub fn add_tracer_span ( & mut self , span : & Span , client_computed_stats : bool ) {
1394+ pub fn add_tracer_span ( & mut self , span : & Span ) {
13701395 if let Some ( request_id) = span. meta . get ( "request_id" ) {
1371- self . context_buffer
1372- . add_tracer_span ( request_id, span, client_computed_stats) ;
1396+ self . context_buffer . add_tracer_span ( request_id, span) ;
13731397 }
13741398 }
13751399
@@ -2180,98 +2204,6 @@ mod tests {
21802204 ) ;
21812205 }
21822206
2183- /// Verifies that `client_computed_stats` set on a context via `add_tracer_span` is
2184- /// propagated all the way through `send_ctx_spans` to the `aws.lambda` payload sent
2185- /// to the backend, so the extension does not generate duplicate stats.
2186- #[ tokio:: test]
2187- #[ allow( clippy:: unwrap_used) ]
2188- async fn test_client_computed_stats_propagated_to_aws_lambda_span ( ) {
2189- use crate :: traces:: stats_concentrator_service:: StatsConcentratorService ;
2190- use crate :: traces:: stats_generator:: StatsGenerator ;
2191- use libdd_trace_obfuscation:: obfuscation_config:: ObfuscationConfig ;
2192- use tokio:: sync:: mpsc;
2193-
2194- let config = Arc :: new ( config:: Config {
2195- apm_dd_url : "https://trace.agent.datadoghq.com" . to_string ( ) ,
2196- ..config:: Config :: default ( )
2197- } ) ;
2198- let tags_provider = Arc :: new ( provider:: Provider :: new (
2199- Arc :: clone ( & config) ,
2200- LAMBDA_RUNTIME_SLUG . to_string ( ) ,
2201- & HashMap :: from ( [ ( "function_arn" . to_string ( ) , "test-arn" . to_string ( ) ) ] ) ,
2202- ) ) ;
2203- let aws_config = Arc :: new ( AwsConfig {
2204- region : "us-east-1" . into ( ) ,
2205- aws_lwa_proxy_lambda_runtime_api : None ,
2206- function_name : "test-function" . into ( ) ,
2207- sandbox_init_time : Instant :: now ( ) ,
2208- runtime_api : "***" . into ( ) ,
2209- exec_wrapper : None ,
2210- initialization_type : "on-demand" . into ( ) ,
2211- } ) ;
2212- let ( aggregator_service, aggregator_handle) =
2213- AggregatorService :: new ( EMPTY_TAGS , 1024 ) . expect ( "failed to create aggregator service" ) ;
2214- tokio:: spawn ( aggregator_service. run ( ) ) ;
2215- let propagator = Arc :: new ( DatadogCompositePropagator :: new ( Arc :: clone ( & config) ) ) ;
2216- let ( durable_context_tx, _) = tokio:: sync:: mpsc:: channel ( 1 ) ;
2217- let mut p = Processor :: new (
2218- Arc :: clone ( & tags_provider) ,
2219- Arc :: clone ( & config) ,
2220- aws_config,
2221- aggregator_handle,
2222- propagator,
2223- durable_context_tx,
2224- ) ;
2225-
2226- let ( trace_tx, mut trace_rx) = mpsc:: channel ( 10 ) ;
2227- let ( stats_concentrator_service, stats_concentrator_handle) =
2228- StatsConcentratorService :: new ( Arc :: clone ( & config) ) ;
2229- tokio:: spawn ( stats_concentrator_service. run ( ) ) ;
2230- let trace_sender = Arc :: new ( SendingTraceProcessor {
2231- appsec : None ,
2232- processor : Arc :: new ( trace_processor:: ServerlessTraceProcessor {
2233- obfuscation_config : Arc :: new (
2234- ObfuscationConfig :: new ( ) . expect ( "Failed to create ObfuscationConfig" ) ,
2235- ) ,
2236- } ) ,
2237- trace_tx,
2238- stats_generator : Arc :: new ( StatsGenerator :: new ( stats_concentrator_handle) ) ,
2239- } ) ;
2240-
2241- let mut context = Context :: from_request_id ( "req-1" ) ;
2242- context. invocation_span . trace_id = 1 ;
2243- context. invocation_span . span_id = 2 ;
2244- context. client_computed_stats = true ;
2245-
2246- p. send_ctx_spans ( & tags_provider, & trace_sender, context)
2247- . await ;
2248-
2249- let payload = trace_rx
2250- . recv ( )
2251- . await
2252- . expect ( "expected payload from trace_tx" ) ;
2253- assert ! (
2254- payload. header_tags. client_computed_stats,
2255- "client_computed_stats must be propagated to the aws.lambda span payload"
2256- ) ;
2257-
2258- // Verify _dd.compute_stats is "0" in the built payload tags: client_computed_stats=true
2259- // means the tracer has already computed stats, so neither extension nor backend should.
2260- let send_data = payload. builder . build ( ) ;
2261- let libdd_trace_utils:: tracer_payload:: TracerPayloadCollection :: V07 ( payloads) =
2262- send_data. get_payloads ( )
2263- else {
2264- panic ! ( "expected V07 payload" ) ;
2265- } ;
2266- for p in payloads {
2267- assert_eq ! (
2268- p. tags. get( crate :: tags:: lambda:: tags:: COMPUTE_STATS_KEY ) ,
2269- Some ( & "0" . to_string( ) ) ,
2270- "_dd.compute_stats must be 0 when client_computed_stats is true"
2271- ) ;
2272- }
2273- }
2274-
22752207 fn make_trace_sender ( config : Arc < config:: Config > ) -> Arc < SendingTraceProcessor > {
22762208 use libdd_trace_obfuscation:: obfuscation_config:: ObfuscationConfig ;
22772209 let ( stats_concentrator_service, stats_concentrator_handle) =
0 commit comments