Skip to content

Commit 8883ba1

Browse files
committed
initial commit
1 parent 087b5e9 commit 8883ba1

3 files changed

Lines changed: 88 additions & 106 deletions

File tree

datadog_lambda/patch.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,16 @@ def patch_all():
3131

3232
if config.trace_enabled:
3333
patch_all_dd()
34-
# Todo: remove this for PR. This is just a testing helper
35-
# Manually patch the durable execution integration since it may not
36-
# be registered in the PyPI ddtrace's _monkey.py yet.
34+
# Todo: remove this for PR. This is just a testing helper.
35+
# Call the aws_durable_execution_sdk_python integration's patch()
36+
# directly because PyPI ddtrace's _monkey.py doesn't know about it yet,
37+
# so ddtrace.patch(aws_durable_execution_sdk_python=True) would be a no-op.
3738
try:
38-
from ddtrace import patch as _patch_dd
39-
_patch_dd(aws_durable_execution_sdk_python=True)
40-
except Exception:
41-
pass
39+
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import patch as _patch_ade
40+
_patch_ade()
41+
print("[DD-DURABLE] aws_durable_execution_sdk_python integration patched")
42+
except Exception as e:
43+
print(f"[DD-DURABLE] Failed to patch aws_durable_execution_sdk_python: {e}")
4244
else:
4345
_patch_http()
4446
_ensure_patch_requests()

datadog_lambda/tracing.py

Lines changed: 66 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -659,19 +659,19 @@ def is_durable_execution_replay(event):
659659
return is_replay
660660

661661

662+
_TRACE_CHECKPOINT_PREFIX = "_dd_trace_context_"
663+
664+
662665
def extract_context_from_durable_execution(event, lambda_context):
663666
"""
664667
Extract Datadog trace context from AWS Lambda Durable Execution event.
665668
666-
Looks for extra trace context checkpoints created by the dd-trace plugin.
667-
These are STEP operations with Name="_dd_trace_context" that store trace
668-
headers in their StepDetails.Result payload. Customer operation payloads
669-
are never read or modified.
670-
671-
Scans operations in reverse to find the LAST trace checkpoint, which
672-
corresponds to the most recently completed customer operation. This gives
673-
proper parent chaining: each invocation's root span is parented to the
674-
last operation span from the previous invocation.
669+
Looks for trace-context checkpoints created by the dd-trace-py integration
670+
on previous invocations. Checkpoints are STEP operations named
671+
``_dd_trace_context_{N}`` with trace headers stored as their StepDetails.Result
672+
payload. The one with the highest ``{N}`` wins — it corresponds to the
673+
latest trace-context state from the previous invocation. Customer
674+
operation payloads are never read or modified.
675675
"""
676676
try:
677677
if not isinstance(event, dict):
@@ -687,54 +687,59 @@ def extract_context_from_durable_execution(event, lambda_context):
687687

688688
print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState")
689689

690-
# Scan in reverse to find the LAST trace context checkpoint
691-
# (corresponds to the most recently completed customer operation)
692-
for idx in range(len(operations) - 1, -1, -1):
693-
operation = operations[idx]
690+
# Collect all _dd_trace_context_{N} checkpoints, then pick the highest N
691+
candidates = [] # list of (number, operation)
692+
for operation in operations:
694693
op_name = operation.get("Name")
695-
696-
if op_name != "_dd_trace_context":
694+
if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX):
697695
continue
696+
suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX):]
697+
try:
698+
number = int(suffix)
699+
except ValueError:
700+
continue
701+
candidates.append((number, operation))
698702

699-
operation_id = operation.get("Id")
700-
print(f"[DD-DURABLE] Found trace checkpoint: id={operation_id}, index={idx}")
703+
if not candidates:
704+
print("[DD-DURABLE] No trace context checkpoints found in operations")
705+
return None
701706

702-
# Trace context is in StepDetails.Result (standard STEP format)
703-
step_details = operation.get("StepDetails", {})
704-
payload_str = step_details.get("Result")
707+
candidates.sort(key=lambda t: t[0])
708+
number, operation = candidates[-1]
709+
operation_id = operation.get("Id")
710+
op_name = operation.get("Name")
711+
print(f"[DD-DURABLE] Using latest trace checkpoint: name={op_name}, id={operation_id}")
705712

706-
if not payload_str:
707-
print(f"[DD-DURABLE] Trace checkpoint {operation_id} has no Result, skipping")
708-
continue
713+
step_details = operation.get("StepDetails", {})
714+
payload_str = step_details.get("Result")
715+
if not payload_str:
716+
print(f"[DD-DURABLE] Trace checkpoint {op_name} has no Result, skipping")
717+
return None
709718

710-
try:
711-
payload = json.loads(payload_str)
712-
if not isinstance(payload, dict):
713-
print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}")
714-
continue
715-
716-
trace_id = payload.get("x-datadog-trace-id")
717-
span_id = payload.get("x-datadog-parent-id")
718-
719-
if trace_id and span_id:
720-
# Use HTTPPropagator to restore full context including
721-
# baggage, _dd.p.* tags, origin, and sampling priority
722-
context = propagator.extract(payload)
723-
if context and context.trace_id:
724-
print(f"[DD-DURABLE] Extracted trace context from trace checkpoint {operation_id}")
725-
print(f"[DD-DURABLE] trace_id={trace_id}, span_id={span_id}, headers={list(payload.keys())}")
726-
logger.debug(
727-
"Extracted Datadog trace context from trace checkpoint %s: %s",
728-
operation_id,
729-
context,
730-
)
731-
return context
732-
except (json.JSONDecodeError, TypeError, ValueError) as e:
733-
print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}")
734-
logger.debug("Failed to parse trace checkpoint payload: %s", e)
735-
continue
719+
try:
720+
payload = json.loads(payload_str)
721+
except (json.JSONDecodeError, TypeError, ValueError) as e:
722+
print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}")
723+
logger.debug("Failed to parse trace checkpoint payload: %s", e)
724+
return None
725+
726+
if not isinstance(payload, dict):
727+
print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}")
728+
return None
736729

737-
print("[DD-DURABLE] No trace context checkpoints found in operations")
730+
context = propagator.extract(payload)
731+
if context and context.trace_id:
732+
print(
733+
f"[DD-DURABLE] Extracted trace context from {op_name}: "
734+
f"trace_id={context.trace_id}, span_id={context.span_id}, "
735+
f"headers={list(payload.keys())}"
736+
)
737+
logger.debug(
738+
"Extracted Datadog trace context from trace checkpoint %s: %s",
739+
op_name,
740+
context,
741+
)
742+
return context
738743
except Exception as e:
739744
logger.debug("Failed to extract trace context from durable execution: %s", e)
740745

@@ -1161,12 +1166,16 @@ def process_injected_data(event, request_time_epoch_ms, args, tags):
11611166
start_time_ns = int(
11621167
injected_authorizer_data.get(Headers.Parent_Span_Finish_Time)
11631168
)
1164-
integration_latency = int(
1165-
event["requestContext"]["authorizer"].get("integrationLatency", 0)
1166-
)
1167-
finish_time_ns = max(
1168-
start_time_ns, (request_time_epoch_ms + integration_latency) * 1e6
1169-
)
1169+
finish_time_ns = (
1170+
request_time_epoch_ms
1171+
+ (
1172+
int(
1173+
event["requestContext"]["authorizer"].get(
1174+
"integrationLatency", 0
1175+
)
1176+
)
1177+
)
1178+
) * 1e6
11701179
upstream_authorizer_span = insert_upstream_authorizer_span(
11711180
args, tags, start_time_ns, finish_time_ns
11721181
)
@@ -1629,9 +1638,9 @@ def create_function_execution_span(
16291638
trace_context_source,
16301639
merge_xray_traces,
16311640
trigger_tags,
1632-
durable_function_tags=None,
16331641
parent_span=None,
16341642
span_pointers=None,
1643+
durable_function_tags=None,
16351644
):
16361645
tags = None
16371646
if context:
@@ -1640,7 +1649,6 @@ def create_function_execution_span(
16401649
function_arn = ":".join(tk[0:7]) if len(tk) > 7 else function_arn
16411650
function_version = tk[7] if len(tk) > 7 else "$LATEST"
16421651
tags = {
1643-
"span.kind": "server",
16441652
"cold_start": str(is_cold_start).lower(),
16451653
"function_arn": function_arn,
16461654
"function_version": function_version,

datadog_lambda/wrapper.py

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def __init__(self, func):
153153
self.blocking_response = None
154154
self.durable_root_span = None
155155
self.is_durable = False
156+
self.durable_status = None
156157

157158
if config.profiling_enabled and profiler:
158159
self.prof = profiler.Profiler(env=config.env, service=config.service)
@@ -307,14 +308,6 @@ def _before(self, event, context):
307308
# Component 4: On replays, returns None (context from checkpoint)
308309
self.durable_root_span = create_durable_execution_root_span(event)
309310

310-
# Store root span reference for Component 2 (checkpoint save)
311-
if self.durable_root_span:
312-
try:
313-
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import set_durable_root_span
314-
set_durable_root_span(self.durable_root_span)
315-
except Exception:
316-
pass
317-
318311
# Create aws.lambda span — child of root durable span (first invocation)
319312
# or child of checkpoint context (replay), or normal parent otherwise
320313
self.span = create_function_execution_span(
@@ -333,6 +326,7 @@ def _before(self, event, context):
333326
if config.appsec_enabled:
334327
asm_start_request(self.span, event, event_source, self.trigger_tags)
335328
self.blocking_response = get_asm_blocked_response(self.event_source)
329+
336330
else:
337331
set_correlation_ids()
338332
if config.profiling_enabled and profiler and is_new_sandbox():
@@ -359,6 +353,11 @@ def _after(self, event, context):
359353
if should_trace_cold_start:
360354
trace_ctx = tracer.current_trace_context()
361355

356+
if self.is_durable:
357+
self.durable_status = extract_durable_execution_status(
358+
self.response, event
359+
)
360+
362361
if self.span:
363362
if config.appsec_enabled and not self.blocking_response:
364363
asm_start_response(
@@ -384,45 +383,18 @@ def _after(self, event, context):
384383
if status_code:
385384
self.span.set_tag("http.status_code", status_code)
386385

387-
durable_status = extract_durable_execution_status(self.response, event)
388-
if durable_status:
386+
if self.durable_status:
389387
self.span.set_tag(
390388
"aws_lambda.durable_function.execution_status",
391-
durable_status,
389+
self.durable_status,
392390
)
393391

394392
self.span.finish()
395393

396-
# Component 3: After aws.lambda closes but BEFORE root span closes,
397-
# check if trace context was enriched during this invocation.
398-
# The context is still alive because _reactivate=True on the parent context.
399-
# This is best-effort for the end-of-invocation case; the piggyback in
400-
# _patched_create_checkpoint handles saves during handler execution.
401-
if self.is_durable:
402-
try:
403-
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import (
404-
_has_context_changed, save_updated_trace_checkpoint,
405-
_get_current_execution_state, _inject_current_context,
406-
)
407-
state = _get_current_execution_state()
408-
if state:
409-
before_headers = getattr(state, "_dd_before_trace_headers", None)
410-
if before_headers is not None and _has_context_changed(before_headers):
411-
print("[DD-DURABLE] Context changed at end of invocation, saving updated checkpoint")
412-
state._dd_saving_trace_checkpoint = True
413-
try:
414-
save_updated_trace_checkpoint(state)
415-
finally:
416-
state._dd_saving_trace_checkpoint = False
417-
current = _inject_current_context()
418-
if current:
419-
state._dd_before_trace_headers = current
420-
except Exception as e:
421-
print(f"[DD-DURABLE] Best-effort context change check in _after: {e}")
422-
423-
# Finish durable execution root span LAST (Component 1)
424-
# Component 2 (root checkpoint) is handled in _patched_create_checkpoint
425-
# piggybacked on the first operation's checkpoint call.
394+
# Finish durable execution root span LAST, after aws.lambda.
395+
# The trace-context checkpoint (for cross-invocation continuity)
396+
# is saved by dd-trace-py's aws_durable_execution_sdk_python
397+
# integration when the aws.durable_execution.execute span closes.
426398
if self.durable_root_span:
427399
self.durable_root_span.finish()
428400
print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}")

0 commit comments

Comments
 (0)