diff --git a/datadog_lambda/patch.py b/datadog_lambda/patch.py index 6d2af0dc..105a712b 100644 --- a/datadog_lambda/patch.py +++ b/datadog_lambda/patch.py @@ -31,6 +31,16 @@ def patch_all(): if config.trace_enabled: patch_all_dd() + # Todo: remove this for PR. This is just a testing helper. + # Call the aws_durable_execution_sdk_python integration's patch() + # directly because PyPI ddtrace's _monkey.py doesn't know about it yet, + # so ddtrace.patch(aws_durable_execution_sdk_python=True) would be a no-op. + try: + from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import patch as _patch_ade + _patch_ade() + print("[DD-DURABLE] aws_durable_execution_sdk_python integration patched") + except Exception as e: + print(f"[DD-DURABLE] Failed to patch aws_durable_execution_sdk_python: {e}") else: _patch_http() _ensure_patch_requests() diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 3c7d9f11..001495ef 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -621,6 +621,185 @@ def get_injected_authorizer_data(event, is_http_api) -> dict: logger.debug("Failed to check if invocated by an authorizer. error %s", e) +def is_durable_execution_replay(event): + """ + Check if this Lambda invocation is a durable execution replay. + + A replay occurs when there are existing operations in InitialExecutionState, + meaning this invocation is resuming from a previous checkpoint rather than + starting fresh. + + For replay invocations, we should skip creating inferred spans because: + - The trace context is being continued from the checkpoint + - Creating an inferred span would create a duplicate + + Returns: + True if this is a replay invocation (should skip inferred span) + False if this is first invocation or not a durable execution + """ + if not isinstance(event, dict): + return False + + if "DurableExecutionArn" not in event: + return False + + initial_state = event.get("InitialExecutionState", {}) + operations = initial_state.get("Operations", []) + + # The SDK always includes the EXECUTION operation itself (1 operation on first invocation). + # A replay has >1 operations (the EXECUTION + previously completed operations). + # This aligns with the SDK's ReplayStatus logic in execution.py. + is_replay = len(operations) > 1 + + if is_replay: + print(f"[DD-DURABLE] Detected replay invocation with {len(operations)} existing operations") + else: + print(f"[DD-DURABLE] Detected first invocation ({len(operations)} operations)") + + return is_replay + + +_TRACE_CHECKPOINT_PREFIX = "_dd_trace_context_" + + +def extract_context_from_durable_execution(event, lambda_context): + """ + Extract Datadog trace context from AWS Lambda Durable Execution event. + + Looks for trace-context checkpoints created by the dd-trace-py integration + on previous invocations. Checkpoints are STEP operations named + ``_dd_trace_context_{N}`` with trace headers stored as their StepDetails.Result + payload. The one with the highest ``{N}`` wins — it corresponds to the + latest trace-context state from the previous invocation. Customer + operation payloads are never read or modified. + """ + try: + if not isinstance(event, dict): + return None + + if "DurableExecutionArn" not in event or "InitialExecutionState" not in event: + return None + + print("[DD-DURABLE] Detected AWS Lambda Durable Execution event") + + initial_state = event.get("InitialExecutionState", {}) + operations = initial_state.get("Operations", []) + + print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState") + + # Collect all _dd_trace_context_{N} checkpoints, then pick the highest N + candidates = [] # list of (number, operation) + for operation in operations: + op_name = operation.get("Name") + if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX): + continue + suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX):] + try: + number = int(suffix) + except ValueError: + continue + candidates.append((number, operation)) + + if not candidates: + print("[DD-DURABLE] No trace context checkpoints found in operations") + return None + + candidates.sort(key=lambda t: t[0]) + number, operation = candidates[-1] + operation_id = operation.get("Id") + op_name = operation.get("Name") + print(f"[DD-DURABLE] Using latest trace checkpoint: name={op_name}, id={operation_id}") + + step_details = operation.get("StepDetails", {}) + payload_str = step_details.get("Result") + if not payload_str: + print(f"[DD-DURABLE] Trace checkpoint {op_name} has no Result, skipping") + return None + + try: + payload = json.loads(payload_str) + except (json.JSONDecodeError, TypeError, ValueError) as e: + print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}") + logger.debug("Failed to parse trace checkpoint payload: %s", e) + return None + + if not isinstance(payload, dict): + print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}") + return None + + context = propagator.extract(payload) + if context and context.trace_id: + print( + f"[DD-DURABLE] Extracted trace context from {op_name}: " + f"trace_id={context.trace_id}, span_id={context.span_id}, " + f"headers={list(payload.keys())}" + ) + logger.debug( + "Extracted Datadog trace context from trace checkpoint %s: %s", + op_name, + context, + ) + return context + except Exception as e: + logger.debug("Failed to extract trace context from durable execution: %s", e) + + return None + + +def create_durable_execution_root_span(event): + """ + Create the durable execution root span on the FIRST invocation only. + + Component 1 & 4 of extracheckpoint trace propagation: + - First invocation (no checkpoint context): creates root span, returns it + - Subsequent invocations (checkpoint context found): returns None + (context already activated by extract_context_from_durable_execution, + no need to recreate root span) + + Returns the root span (caller must call span.finish() when invocation ends), + or None if not a durable execution or if this is a replay. + """ + print(f"[DD-DURABLE] create_durable_execution_root_span called, event type={type(event).__name__}") + try: + if not isinstance(event, dict): + return None + + execution_arn = event.get("DurableExecutionArn") + has_initial_state = "InitialExecutionState" in event + if not execution_arn or not has_initial_state: + return None + + # Component 4: On replay, context is already activated from checkpoint. + # Don't recreate root span — it was already emitted in a prior invocation. + if is_durable_execution_replay(event): + print("[DD-DURABLE] Replay invocation — skipping root span creation (context from checkpoint)") + return None + + # Component 1: First invocation — create new root span + service_name = os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution" + resource = execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn + + span = tracer.trace( + "aws.durable-execution", + service=service_name, + resource=resource, + span_type="serverless", + ) + + if span: + span.set_tag("durable.execution_arn", execution_arn) + print(f"[DD-DURABLE] Created root span: trace_id={span.trace_id}, span_id={span.span_id}, resource={resource}") + else: + print("[DD-DURABLE] tracer.trace() returned None") + + return span + + except Exception as e: + logger.debug("Failed to create durable execution root span: %s", e) + print(f"[DD-DURABLE] Failed to create root span: {e}") + return None + + def extract_dd_trace_context( event, lambda_context, extractor=None, decode_authorizer_context: bool = True ): @@ -634,6 +813,16 @@ def extract_dd_trace_context( trace_context_source = None event_source = parse_event_source(event) + # Check for AWS Lambda Durable Execution events first (before other checks) + # This ensures trace context is properly continued across durable invocations + durable_context = extract_context_from_durable_execution(event, lambda_context) + if _is_context_complete(durable_context): + logger.debug("Extracted Datadog trace context from durable execution") + dd_trace_context = durable_context + trace_context_source = TraceContextSource.EVENT + logger.debug("extracted dd trace context from durable execution: %s", dd_trace_context) + return dd_trace_context, trace_context_source, event_source + if extractor is not None: context = extract_context_custom_extractor(extractor, event, lambda_context) elif isinstance(event, (set, dict)) and "request" in event: @@ -977,12 +1166,16 @@ def process_injected_data(event, request_time_epoch_ms, args, tags): start_time_ns = int( injected_authorizer_data.get(Headers.Parent_Span_Finish_Time) ) - integration_latency = int( - event["requestContext"]["authorizer"].get("integrationLatency", 0) - ) - finish_time_ns = max( - start_time_ns, (request_time_epoch_ms + integration_latency) * 1e6 - ) + finish_time_ns = ( + request_time_epoch_ms + + ( + int( + event["requestContext"]["authorizer"].get( + "integrationLatency", 0 + ) + ) + ) + ) * 1e6 upstream_authorizer_span = insert_upstream_authorizer_span( args, tags, start_time_ns, finish_time_ns ) @@ -1445,9 +1638,9 @@ def create_function_execution_span( trace_context_source, merge_xray_traces, trigger_tags, - durable_function_tags=None, parent_span=None, span_pointers=None, + durable_function_tags=None, ): tags = None if context: @@ -1456,7 +1649,6 @@ def create_function_execution_span( function_arn = ":".join(tk[0:7]) if len(tk) > 7 else function_arn function_version = tk[7] if len(tk) > 7 else "$LATEST" tags = { - "span.kind": "server", "cold_start": str(is_cold_start).lower(), "function_arn": function_arn, "function_version": function_version, diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 767816a5..718473e6 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -39,6 +39,8 @@ create_inferred_span, InferredSpanInfo, is_authorizer_response, + is_durable_execution_replay, + create_durable_execution_root_span, tracer, propagator, ) @@ -149,6 +151,9 @@ def __init__(self, func): self.inferred_span = None self.response = None self.blocking_response = None + self.durable_root_span = None + self.is_durable = False + self.durable_status = None if config.profiling_enabled and profiler: self.prof = profiler.Profiler(env=config.env, service=config.service) @@ -269,7 +274,9 @@ def _before(self, event, context): if config.trace_enabled: set_dd_trace_py_root(trace_context_source, config.merge_xray_traces) - if config.make_inferred_span: + # Skip inferred span for durable execution replays to avoid duplicates + # For replays, trace context comes from checkpoint, not from event trigger + if config.make_inferred_span and not is_durable_execution_replay(event): self.inferred_span = create_inferred_span( event, context, event_source, config.decode_authorizer_context ) @@ -277,6 +284,32 @@ def _before(self, event, context): if config.appsec_enabled: asm_set_context(event_source) + # For durable executions: create root span BEFORE aws.lambda span + # so aws.lambda becomes a child of the root durable execution span. + self.is_durable = isinstance(event, dict) and "DurableExecutionArn" in event + if self.is_durable: + # Set _reactivate on the active context so it persists after + # all spans close. This prevents ddtrace from purging the + # context when the last span (aws.lambda or root) finishes. + active_ctx = tracer.context_provider.active() + if active_ctx and hasattr(active_ctx, '_reactivate'): + active_ctx._reactivate = True + print(f"[DD-DURABLE] Set _reactivate=True on active context") + + # For replay: copy _meta from extracted context for _dd.p.* tag propagation + # set_dd_trace_py_root only copies trace_id/span_id/sampling_priority, + # so propagation tags from the checkpoint would be lost without this. + if dd_context and hasattr(dd_context, '_meta') and active_ctx and hasattr(active_ctx, '_meta'): + for k, v in dd_context._meta.items(): + if k not in active_ctx._meta: + active_ctx._meta[k] = v + + # Component 1: Create root span (first invocation only) + # Component 4: On replays, returns None (context from checkpoint) + self.durable_root_span = create_durable_execution_root_span(event) + + # Create aws.lambda span — child of root durable span (first invocation) + # or child of checkpoint context (replay), or normal parent otherwise self.span = create_function_execution_span( context=context, function_name=config.function_name, @@ -289,9 +322,11 @@ def _before(self, event, context): parent_span=self.inferred_span, span_pointers=calculate_span_pointers(event_source, event), ) + if config.appsec_enabled: asm_start_request(self.span, event, event_source, self.trigger_tags) self.blocking_response = get_asm_blocked_response(self.event_source) + else: set_correlation_ids() if config.profiling_enabled and profiler and is_new_sandbox(): @@ -318,6 +353,11 @@ def _after(self, event, context): if should_trace_cold_start: trace_ctx = tracer.current_trace_context() + if self.is_durable: + self.durable_status = extract_durable_execution_status( + self.response, event + ) + if self.span: if config.appsec_enabled and not self.blocking_response: asm_start_response( @@ -343,15 +383,23 @@ def _after(self, event, context): if status_code: self.span.set_tag("http.status_code", status_code) - durable_status = extract_durable_execution_status(self.response, event) - if durable_status: + if self.durable_status: self.span.set_tag( "aws_lambda.durable_function.execution_status", - durable_status, + self.durable_status, ) self.span.finish() + # Finish durable execution root span LAST, after aws.lambda. + # The trace-context checkpoint (for cross-invocation continuity) + # is saved by dd-trace-py's aws_durable_execution_sdk_python + # integration when the aws.durable_execution.execute span closes. + if self.durable_root_span: + self.durable_root_span.finish() + print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}") + self.durable_root_span = None + if status_code: self.trigger_tags["http.status_code"] = status_code mark_trace_as_error_for_5xx_responses(context, status_code, self.span)