Skip to content

Commit d9d6551

Browse files
committed
initial checkin
1 parent c441931 commit d9d6551

2 files changed

Lines changed: 261 additions & 1 deletion

File tree

datadog_lambda/tracing.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,180 @@ def get_injected_authorizer_data(event, is_http_api) -> dict:
621621
logger.debug("Failed to check if invocated by an authorizer. error %s", e)
622622

623623

624+
def is_durable_execution_replay(event):
625+
"""
626+
Check if this Lambda invocation is a durable execution replay.
627+
628+
A replay occurs when there are existing operations in InitialExecutionState,
629+
meaning this invocation is resuming from a previous checkpoint rather than
630+
starting fresh.
631+
632+
For replay invocations, we should skip creating inferred spans because:
633+
- The trace context is being continued from the checkpoint
634+
- Creating an inferred span would create a duplicate
635+
636+
Returns:
637+
True if this is a replay invocation (should skip inferred span)
638+
False if this is first invocation or not a durable execution
639+
"""
640+
if not isinstance(event, dict):
641+
return False
642+
643+
if "DurableExecutionArn" not in event:
644+
return False
645+
646+
initial_state = event.get("InitialExecutionState", {})
647+
operations = initial_state.get("Operations", [])
648+
649+
# The SDK always includes the EXECUTION operation itself (1 operation on first invocation).
650+
# A replay has >1 operations (the EXECUTION + previously completed operations).
651+
# This aligns with the SDK's ReplayStatus logic in execution.py.
652+
is_replay = len(operations) > 1
653+
654+
if is_replay:
655+
print(f"[DD-DURABLE] Detected replay invocation with {len(operations)} existing operations")
656+
else:
657+
print(f"[DD-DURABLE] Detected first invocation ({len(operations)} operations)")
658+
659+
return is_replay
660+
661+
662+
def extract_context_from_durable_execution(event, lambda_context):
663+
"""
664+
Extract Datadog trace context from AWS Lambda Durable Execution event.
665+
666+
Looks for extra trace context checkpoints created by the dd-trace plugin.
667+
These are STEP operations with Name="_dd_trace_context" that store trace
668+
headers in their StepDetails.Result payload. Customer operation payloads
669+
are never read or modified.
670+
671+
Scans operations in reverse to find the LAST trace checkpoint, which
672+
corresponds to the most recently completed customer operation. This gives
673+
proper parent chaining: each invocation's root span is parented to the
674+
last operation span from the previous invocation.
675+
"""
676+
try:
677+
if not isinstance(event, dict):
678+
return None
679+
680+
if "DurableExecutionArn" not in event or "InitialExecutionState" not in event:
681+
return None
682+
683+
print("[DD-DURABLE] Detected AWS Lambda Durable Execution event")
684+
685+
initial_state = event.get("InitialExecutionState", {})
686+
operations = initial_state.get("Operations", [])
687+
688+
print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState")
689+
690+
# Scan in reverse to find the LAST trace context checkpoint
691+
# (corresponds to the most recently completed customer operation)
692+
for idx in range(len(operations) - 1, -1, -1):
693+
operation = operations[idx]
694+
op_name = operation.get("Name")
695+
696+
if op_name != "_dd_trace_context":
697+
continue
698+
699+
operation_id = operation.get("Id")
700+
print(f"[DD-DURABLE] Found trace checkpoint: id={operation_id}, index={idx}")
701+
702+
# Trace context is in StepDetails.Result (standard STEP format)
703+
step_details = operation.get("StepDetails", {})
704+
payload_str = step_details.get("Result")
705+
706+
if not payload_str:
707+
print(f"[DD-DURABLE] Trace checkpoint {operation_id} has no Result, skipping")
708+
continue
709+
710+
try:
711+
payload = json.loads(payload_str)
712+
if not isinstance(payload, dict):
713+
print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}")
714+
continue
715+
716+
trace_id = payload.get("x-datadog-trace-id")
717+
span_id = payload.get("x-datadog-parent-id")
718+
719+
if trace_id and span_id:
720+
# Use HTTPPropagator to restore full context including
721+
# baggage, _dd.p.* tags, origin, and sampling priority
722+
context = propagator.extract(payload)
723+
if context and context.trace_id:
724+
print(f"[DD-DURABLE] Extracted trace context from trace checkpoint {operation_id}")
725+
print(f"[DD-DURABLE] trace_id={trace_id}, span_id={span_id}, headers={list(payload.keys())}")
726+
logger.debug(
727+
"Extracted Datadog trace context from trace checkpoint %s: %s",
728+
operation_id,
729+
context,
730+
)
731+
return context
732+
except (json.JSONDecodeError, TypeError, ValueError) as e:
733+
print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}")
734+
logger.debug("Failed to parse trace checkpoint payload: %s", e)
735+
continue
736+
737+
print("[DD-DURABLE] No trace context checkpoints found in operations")
738+
except Exception as e:
739+
logger.debug("Failed to extract trace context from durable execution: %s", e)
740+
741+
return None
742+
743+
744+
def create_durable_execution_root_span(event):
745+
"""
746+
Create the durable execution root span on the FIRST invocation only.
747+
748+
Component 1 & 4 of extracheckpoint trace propagation:
749+
- First invocation (no checkpoint context): creates root span, returns it
750+
- Subsequent invocations (checkpoint context found): returns None
751+
(context already activated by extract_context_from_durable_execution,
752+
no need to recreate root span)
753+
754+
Returns the root span (caller must call span.finish() when invocation ends),
755+
or None if not a durable execution or if this is a replay.
756+
"""
757+
print(f"[DD-DURABLE] create_durable_execution_root_span called, event type={type(event).__name__}")
758+
try:
759+
if not isinstance(event, dict):
760+
return None
761+
762+
execution_arn = event.get("DurableExecutionArn")
763+
has_initial_state = "InitialExecutionState" in event
764+
if not execution_arn or not has_initial_state:
765+
return None
766+
767+
# Component 4: On replay, context is already activated from checkpoint.
768+
# Don't recreate root span — it was already emitted in a prior invocation.
769+
if is_durable_execution_replay(event):
770+
print("[DD-DURABLE] Replay invocation — skipping root span creation (context from checkpoint)")
771+
return None
772+
773+
# Component 1: First invocation — create new root span
774+
service_name = os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution"
775+
resource = execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn
776+
777+
span = tracer.trace(
778+
"aws.durable-execution",
779+
service=service_name,
780+
resource=resource,
781+
span_type="serverless",
782+
)
783+
784+
if span:
785+
span.set_tag("durable.execution_arn", execution_arn)
786+
print(f"[DD-DURABLE] Created root span: trace_id={span.trace_id}, span_id={span.span_id}, resource={resource}")
787+
else:
788+
print("[DD-DURABLE] tracer.trace() returned None")
789+
790+
return span
791+
792+
except Exception as e:
793+
logger.debug("Failed to create durable execution root span: %s", e)
794+
print(f"[DD-DURABLE] Failed to create root span: {e}")
795+
return None
796+
797+
624798
def extract_dd_trace_context(
625799
event, lambda_context, extractor=None, decode_authorizer_context: bool = True
626800
):
@@ -634,6 +808,16 @@ def extract_dd_trace_context(
634808
trace_context_source = None
635809
event_source = parse_event_source(event)
636810

811+
# Check for AWS Lambda Durable Execution events first (before other checks)
812+
# This ensures trace context is properly continued across durable invocations
813+
durable_context = extract_context_from_durable_execution(event, lambda_context)
814+
if _is_context_complete(durable_context):
815+
logger.debug("Extracted Datadog trace context from durable execution")
816+
dd_trace_context = durable_context
817+
trace_context_source = TraceContextSource.EVENT
818+
logger.debug("extracted dd trace context from durable execution: %s", dd_trace_context)
819+
return dd_trace_context, trace_context_source, event_source
820+
637821
if extractor is not None:
638822
context = extract_context_custom_extractor(extractor, event, lambda_context)
639823
elif isinstance(event, (set, dict)) and "request" in event:

datadog_lambda/wrapper.py

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
create_inferred_span,
4040
InferredSpanInfo,
4141
is_authorizer_response,
42+
is_durable_execution_replay,
43+
create_durable_execution_root_span,
4244
tracer,
4345
propagator,
4446
)
@@ -149,6 +151,8 @@ def __init__(self, func):
149151
self.inferred_span = None
150152
self.response = None
151153
self.blocking_response = None
154+
self.durable_root_span = None
155+
self.is_durable = False
152156

153157
if config.profiling_enabled and profiler:
154158
self.prof = profiler.Profiler(env=config.env, service=config.service)
@@ -269,14 +273,50 @@ def _before(self, event, context):
269273

270274
if config.trace_enabled:
271275
set_dd_trace_py_root(trace_context_source, config.merge_xray_traces)
272-
if config.make_inferred_span:
276+
# Skip inferred span for durable execution replays to avoid duplicates
277+
# For replays, trace context comes from checkpoint, not from event trigger
278+
if config.make_inferred_span and not is_durable_execution_replay(event):
273279
self.inferred_span = create_inferred_span(
274280
event, context, event_source, config.decode_authorizer_context
275281
)
276282

277283
if config.appsec_enabled:
278284
asm_set_context(event_source)
279285

286+
# For durable executions: create root span BEFORE aws.lambda span
287+
# so aws.lambda becomes a child of the root durable execution span.
288+
self.is_durable = isinstance(event, dict) and "DurableExecutionArn" in event
289+
if self.is_durable:
290+
# Set _reactivate on the active context so it persists after
291+
# all spans close. This prevents ddtrace from purging the
292+
# context when the last span (aws.lambda or root) finishes.
293+
active_ctx = tracer.context_provider.active()
294+
if active_ctx and hasattr(active_ctx, '_reactivate'):
295+
active_ctx._reactivate = True
296+
print(f"[DD-DURABLE] Set _reactivate=True on active context")
297+
298+
# For replay: copy _meta from extracted context for _dd.p.* tag propagation
299+
# set_dd_trace_py_root only copies trace_id/span_id/sampling_priority,
300+
# so propagation tags from the checkpoint would be lost without this.
301+
if dd_context and hasattr(dd_context, '_meta') and active_ctx and hasattr(active_ctx, '_meta'):
302+
for k, v in dd_context._meta.items():
303+
if k not in active_ctx._meta:
304+
active_ctx._meta[k] = v
305+
306+
# Component 1: Create root span (first invocation only)
307+
# Component 4: On replays, returns None (context from checkpoint)
308+
self.durable_root_span = create_durable_execution_root_span(event)
309+
310+
# Store root span reference for Component 2 (checkpoint save)
311+
if self.durable_root_span:
312+
try:
313+
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import set_durable_root_span
314+
set_durable_root_span(self.durable_root_span)
315+
except Exception:
316+
pass
317+
318+
# Create aws.lambda span — child of root durable span (first invocation)
319+
# or child of checkpoint context (replay), or normal parent otherwise
280320
self.span = create_function_execution_span(
281321
context=context,
282322
function_name=config.function_name,
@@ -289,6 +329,7 @@ def _before(self, event, context):
289329
parent_span=self.inferred_span,
290330
span_pointers=calculate_span_pointers(event_source, event),
291331
)
332+
292333
if config.appsec_enabled:
293334
asm_start_request(self.span, event, event_source, self.trigger_tags)
294335
self.blocking_response = get_asm_blocked_response(self.event_source)
@@ -352,6 +393,41 @@ def _after(self, event, context):
352393

353394
self.span.finish()
354395

396+
# Component 3: After aws.lambda closes but BEFORE root span closes,
397+
# check if trace context was enriched during this invocation.
398+
# The context is still alive because _reactivate=True on the parent context.
399+
# This is best-effort for the end-of-invocation case; the piggyback in
400+
# _patched_create_checkpoint handles saves during handler execution.
401+
if self.is_durable:
402+
try:
403+
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import (
404+
_has_context_changed, save_updated_trace_checkpoint,
405+
_get_current_execution_state, _inject_current_context,
406+
)
407+
state = _get_current_execution_state()
408+
if state:
409+
before_headers = getattr(state, "_dd_before_trace_headers", None)
410+
if before_headers is not None and _has_context_changed(before_headers):
411+
print("[DD-DURABLE] Context changed at end of invocation, saving updated checkpoint")
412+
state._dd_saving_trace_checkpoint = True
413+
try:
414+
save_updated_trace_checkpoint(state)
415+
finally:
416+
state._dd_saving_trace_checkpoint = False
417+
current = _inject_current_context()
418+
if current:
419+
state._dd_before_trace_headers = current
420+
except Exception as e:
421+
print(f"[DD-DURABLE] Best-effort context change check in _after: {e}")
422+
423+
# Finish durable execution root span LAST (Component 1)
424+
# Component 2 (root checkpoint) is handled in _patched_create_checkpoint
425+
# piggybacked on the first operation's checkpoint call.
426+
if self.durable_root_span:
427+
self.durable_root_span.finish()
428+
print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}")
429+
self.durable_root_span = None
430+
355431
if status_code:
356432
self.trigger_tags["http.status_code"] = status_code
357433
mark_trace_as_error_for_5xx_responses(context, status_code, self.span)

0 commit comments

Comments
 (0)