Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions datadog_lambda/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ def patch_all():

if config.trace_enabled:
patch_all_dd()
# Todo: remove this for PR. This is just a testing helper.
# Call the aws_durable_execution_sdk_python integration's patch()
# directly because PyPI ddtrace's _monkey.py doesn't know about it yet,
# so ddtrace.patch(aws_durable_execution_sdk_python=True) would be a no-op.
try:
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import patch as _patch_ade
_patch_ade()
print("[DD-DURABLE] aws_durable_execution_sdk_python integration patched")
except Exception as e:
print(f"[DD-DURABLE] Failed to patch aws_durable_execution_sdk_python: {e}")
else:
_patch_http()
_ensure_patch_requests()
Expand Down
208 changes: 200 additions & 8 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,185 @@ def get_injected_authorizer_data(event, is_http_api) -> dict:
logger.debug("Failed to check if invocated by an authorizer. error %s", e)


def is_durable_execution_replay(event):
"""
Check if this Lambda invocation is a durable execution replay.

A replay occurs when there are existing operations in InitialExecutionState,
meaning this invocation is resuming from a previous checkpoint rather than
starting fresh.

For replay invocations, we should skip creating inferred spans because:
- The trace context is being continued from the checkpoint
- Creating an inferred span would create a duplicate

Returns:
True if this is a replay invocation (should skip inferred span)
False if this is first invocation or not a durable execution
"""
if not isinstance(event, dict):
return False

if "DurableExecutionArn" not in event:
return False

initial_state = event.get("InitialExecutionState", {})
operations = initial_state.get("Operations", [])

# The SDK always includes the EXECUTION operation itself (1 operation on first invocation).
# A replay has >1 operations (the EXECUTION + previously completed operations).
# This aligns with the SDK's ReplayStatus logic in execution.py.
is_replay = len(operations) > 1

if is_replay:
print(f"[DD-DURABLE] Detected replay invocation with {len(operations)} existing operations")
else:
print(f"[DD-DURABLE] Detected first invocation ({len(operations)} operations)")

return is_replay


_TRACE_CHECKPOINT_PREFIX = "_dd_trace_context_"


def extract_context_from_durable_execution(event, lambda_context):
"""
Extract Datadog trace context from AWS Lambda Durable Execution event.

Looks for trace-context checkpoints created by the dd-trace-py integration
on previous invocations. Checkpoints are STEP operations named
``_dd_trace_context_{N}`` with trace headers stored as their StepDetails.Result
payload. The one with the highest ``{N}`` wins — it corresponds to the
latest trace-context state from the previous invocation. Customer
operation payloads are never read or modified.
"""
try:
if not isinstance(event, dict):
return None

if "DurableExecutionArn" not in event or "InitialExecutionState" not in event:
return None

print("[DD-DURABLE] Detected AWS Lambda Durable Execution event")

initial_state = event.get("InitialExecutionState", {})
operations = initial_state.get("Operations", [])

print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState")

# Collect all _dd_trace_context_{N} checkpoints, then pick the highest N
candidates = [] # list of (number, operation)
for operation in operations:
op_name = operation.get("Name")
if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX):
continue
suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX):]
try:
number = int(suffix)
except ValueError:
continue
candidates.append((number, operation))

if not candidates:
print("[DD-DURABLE] No trace context checkpoints found in operations")
return None

candidates.sort(key=lambda t: t[0])
number, operation = candidates[-1]
operation_id = operation.get("Id")
op_name = operation.get("Name")
print(f"[DD-DURABLE] Using latest trace checkpoint: name={op_name}, id={operation_id}")

step_details = operation.get("StepDetails", {})
payload_str = step_details.get("Result")
if not payload_str:
print(f"[DD-DURABLE] Trace checkpoint {op_name} has no Result, skipping")
return None

try:
payload = json.loads(payload_str)
except (json.JSONDecodeError, TypeError, ValueError) as e:
print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}")
logger.debug("Failed to parse trace checkpoint payload: %s", e)
return None

if not isinstance(payload, dict):
print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}")
return None

context = propagator.extract(payload)
if context and context.trace_id:
print(
f"[DD-DURABLE] Extracted trace context from {op_name}: "
f"trace_id={context.trace_id}, span_id={context.span_id}, "
f"headers={list(payload.keys())}"
)
logger.debug(
"Extracted Datadog trace context from trace checkpoint %s: %s",
op_name,
context,
)
return context
except Exception as e:
logger.debug("Failed to extract trace context from durable execution: %s", e)

return None


def create_durable_execution_root_span(event):
"""
Create the durable execution root span on the FIRST invocation only.

Component 1 & 4 of extracheckpoint trace propagation:
- First invocation (no checkpoint context): creates root span, returns it
- Subsequent invocations (checkpoint context found): returns None
(context already activated by extract_context_from_durable_execution,
no need to recreate root span)

Returns the root span (caller must call span.finish() when invocation ends),
or None if not a durable execution or if this is a replay.
"""
print(f"[DD-DURABLE] create_durable_execution_root_span called, event type={type(event).__name__}")
try:
if not isinstance(event, dict):
return None

execution_arn = event.get("DurableExecutionArn")
has_initial_state = "InitialExecutionState" in event
if not execution_arn or not has_initial_state:
return None

# Component 4: On replay, context is already activated from checkpoint.
# Don't recreate root span — it was already emitted in a prior invocation.
if is_durable_execution_replay(event):
print("[DD-DURABLE] Replay invocation — skipping root span creation (context from checkpoint)")
return None

# Component 1: First invocation — create new root span
service_name = os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution"
resource = execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn

span = tracer.trace(
"aws.durable-execution",
service=service_name,
resource=resource,
span_type="serverless",
)

if span:
span.set_tag("durable.execution_arn", execution_arn)
print(f"[DD-DURABLE] Created root span: trace_id={span.trace_id}, span_id={span.span_id}, resource={resource}")
else:
print("[DD-DURABLE] tracer.trace() returned None")

return span

except Exception as e:
logger.debug("Failed to create durable execution root span: %s", e)
print(f"[DD-DURABLE] Failed to create root span: {e}")
return None


def extract_dd_trace_context(
event, lambda_context, extractor=None, decode_authorizer_context: bool = True
):
Expand All @@ -634,6 +813,16 @@ def extract_dd_trace_context(
trace_context_source = None
event_source = parse_event_source(event)

# Check for AWS Lambda Durable Execution events first (before other checks)
# This ensures trace context is properly continued across durable invocations
durable_context = extract_context_from_durable_execution(event, lambda_context)
if _is_context_complete(durable_context):
logger.debug("Extracted Datadog trace context from durable execution")
dd_trace_context = durable_context
trace_context_source = TraceContextSource.EVENT
logger.debug("extracted dd trace context from durable execution: %s", dd_trace_context)
return dd_trace_context, trace_context_source, event_source

if extractor is not None:
context = extract_context_custom_extractor(extractor, event, lambda_context)
elif isinstance(event, (set, dict)) and "request" in event:
Expand Down Expand Up @@ -977,12 +1166,16 @@ def process_injected_data(event, request_time_epoch_ms, args, tags):
start_time_ns = int(
injected_authorizer_data.get(Headers.Parent_Span_Finish_Time)
)
integration_latency = int(
event["requestContext"]["authorizer"].get("integrationLatency", 0)
)
finish_time_ns = max(
start_time_ns, (request_time_epoch_ms + integration_latency) * 1e6
)
finish_time_ns = (
request_time_epoch_ms
+ (
int(
event["requestContext"]["authorizer"].get(
"integrationLatency", 0
)
)
)
) * 1e6
upstream_authorizer_span = insert_upstream_authorizer_span(
args, tags, start_time_ns, finish_time_ns
)
Expand Down Expand Up @@ -1445,9 +1638,9 @@ def create_function_execution_span(
trace_context_source,
merge_xray_traces,
trigger_tags,
durable_function_tags=None,
parent_span=None,
span_pointers=None,
durable_function_tags=None,
):
tags = None
if context:
Expand All @@ -1456,7 +1649,6 @@ def create_function_execution_span(
function_arn = ":".join(tk[0:7]) if len(tk) > 7 else function_arn
function_version = tk[7] if len(tk) > 7 else "$LATEST"
tags = {
"span.kind": "server",
"cold_start": str(is_cold_start).lower(),
"function_arn": function_arn,
"function_version": function_version,
Expand Down
56 changes: 52 additions & 4 deletions datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
create_inferred_span,
InferredSpanInfo,
is_authorizer_response,
is_durable_execution_replay,
create_durable_execution_root_span,
tracer,
propagator,
)
Expand Down Expand Up @@ -149,6 +151,9 @@ def __init__(self, func):
self.inferred_span = None
self.response = None
self.blocking_response = None
self.durable_root_span = None
self.is_durable = False
self.durable_status = None

if config.profiling_enabled and profiler:
self.prof = profiler.Profiler(env=config.env, service=config.service)
Expand Down Expand Up @@ -269,14 +274,42 @@ def _before(self, event, context):

if config.trace_enabled:
set_dd_trace_py_root(trace_context_source, config.merge_xray_traces)
if config.make_inferred_span:
# Skip inferred span for durable execution replays to avoid duplicates
# For replays, trace context comes from checkpoint, not from event trigger
if config.make_inferred_span and not is_durable_execution_replay(event):
self.inferred_span = create_inferred_span(
event, context, event_source, config.decode_authorizer_context
)

if config.appsec_enabled:
asm_set_context(event_source)

# For durable executions: create root span BEFORE aws.lambda span
# so aws.lambda becomes a child of the root durable execution span.
self.is_durable = isinstance(event, dict) and "DurableExecutionArn" in event
if self.is_durable:
# Set _reactivate on the active context so it persists after
# all spans close. This prevents ddtrace from purging the
# context when the last span (aws.lambda or root) finishes.
active_ctx = tracer.context_provider.active()
if active_ctx and hasattr(active_ctx, '_reactivate'):
active_ctx._reactivate = True
print(f"[DD-DURABLE] Set _reactivate=True on active context")

# For replay: copy _meta from extracted context for _dd.p.* tag propagation
# set_dd_trace_py_root only copies trace_id/span_id/sampling_priority,
# so propagation tags from the checkpoint would be lost without this.
if dd_context and hasattr(dd_context, '_meta') and active_ctx and hasattr(active_ctx, '_meta'):
for k, v in dd_context._meta.items():
if k not in active_ctx._meta:
active_ctx._meta[k] = v

# Component 1: Create root span (first invocation only)
# Component 4: On replays, returns None (context from checkpoint)
self.durable_root_span = create_durable_execution_root_span(event)

# Create aws.lambda span — child of root durable span (first invocation)
# or child of checkpoint context (replay), or normal parent otherwise
self.span = create_function_execution_span(
context=context,
function_name=config.function_name,
Expand All @@ -289,9 +322,11 @@ def _before(self, event, context):
parent_span=self.inferred_span,
span_pointers=calculate_span_pointers(event_source, event),
)

if config.appsec_enabled:
asm_start_request(self.span, event, event_source, self.trigger_tags)
self.blocking_response = get_asm_blocked_response(self.event_source)

else:
set_correlation_ids()
if config.profiling_enabled and profiler and is_new_sandbox():
Expand All @@ -318,6 +353,11 @@ def _after(self, event, context):
if should_trace_cold_start:
trace_ctx = tracer.current_trace_context()

if self.is_durable:
self.durable_status = extract_durable_execution_status(
self.response, event
)

if self.span:
if config.appsec_enabled and not self.blocking_response:
asm_start_response(
Expand All @@ -343,15 +383,23 @@ def _after(self, event, context):
if status_code:
self.span.set_tag("http.status_code", status_code)

durable_status = extract_durable_execution_status(self.response, event)
if durable_status:
if self.durable_status:
self.span.set_tag(
"aws_lambda.durable_function.execution_status",
durable_status,
self.durable_status,
)

self.span.finish()

# Finish durable execution root span LAST, after aws.lambda.
# The trace-context checkpoint (for cross-invocation continuity)
# is saved by dd-trace-py's aws_durable_execution_sdk_python
# integration when the aws.durable_execution.execute span closes.
if self.durable_root_span:
self.durable_root_span.finish()
print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}")
self.durable_root_span = None

if status_code:
self.trigger_tags["http.status_code"] = status_code
mark_trace_as_error_for_5xx_responses(context, status_code, self.span)
Expand Down
Loading