Skip to content

Commit 6568d02

Browse files
committed
move the durable function related extraction logic to durable.py
1 parent 205dd2b commit 6568d02

2 files changed

Lines changed: 60 additions & 55 deletions

File tree

datadog_lambda/durable.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
# Copyright 2019 Datadog, Inc.
55
import logging
66
import re
7+
import ujson as json
78

89
logger = logging.getLogger(__name__)
910

11+
_TRACE_CHECKPOINT_PREFIX = "_datadog_"
12+
1013

1114
def _parse_durable_execution_arn(arn):
1215
"""
@@ -58,6 +61,62 @@ def extract_durable_function_tags(event):
5861
VALID_DURABLE_STATUSES = {"SUCCEEDED", "FAILED", "PENDING"}
5962

6063

64+
def _extract_context_from_durable_checkpoint(operation):
65+
# Checkpoint data is written by the dd-trace-py in Datadog style
66+
# (x-datadog-* headers). Extraction goes through the standard
67+
# propagator.extract path, which honors DD_TRACE_PROPAGATION_STYLE_EXTRACT.
68+
# The default extract list (datadog, tracecontext, baggage) already
69+
# includes datadog. Customers who override the extract list MUST keep
70+
# datadog in it.
71+
if not isinstance(operation, dict):
72+
return None
73+
74+
step_details = operation.get("StepDetails")
75+
if not isinstance(step_details, dict):
76+
return None
77+
78+
result = step_details.get("Result")
79+
if isinstance(result, str):
80+
try:
81+
result = json.loads(result)
82+
except Exception:
83+
return None
84+
85+
if not isinstance(result, dict):
86+
return None
87+
88+
from datadog_lambda.tracing import propagator
89+
90+
return propagator.extract(result)
91+
92+
93+
def extract_context_from_durable_execution(event):
94+
operations = event.get("InitialExecutionState", {}).get("Operations")
95+
if isinstance(operations, dict):
96+
operations = list(operations.values())
97+
if not isinstance(operations, list) or not operations:
98+
return None
99+
100+
highest = -1
101+
best_operation = None
102+
for operation in operations:
103+
if not isinstance(operation, dict):
104+
continue
105+
name = operation.get("Name")
106+
if not isinstance(name, str) or not name.startswith(_TRACE_CHECKPOINT_PREFIX):
107+
continue
108+
suffix = name[len(_TRACE_CHECKPOINT_PREFIX) :]
109+
try:
110+
number = int(suffix)
111+
except (TypeError, ValueError):
112+
continue
113+
if number > highest:
114+
highest = number
115+
best_operation = operation
116+
117+
return _extract_context_from_durable_checkpoint(best_operation)
118+
119+
61120
def extract_durable_execution_status(response, event):
62121
if not isinstance(event, dict) or "DurableExecutionArn" not in event:
63122
return None

datadog_lambda/tracing.py

Lines changed: 1 addition & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
EventTypes,
4545
EventSubtypes,
4646
)
47+
from datadog_lambda.durable import extract_context_from_durable_execution
4748

4849
if config.otel_enabled:
4950
from opentelemetry.trace import set_tracer_provider
@@ -61,7 +62,6 @@
6162
DD_TRACE_JAVA_TRACE_ID_PADDING = "00000000"
6263
HIGHER_64_BITS = "HIGHER_64_BITS"
6364
LOWER_64_BITS = "LOWER_64_BITS"
64-
_TRACE_CHECKPOINT_PREFIX = "_datadog_"
6565

6666

6767
def _dsm_set_checkpoint(context_json, event_type, arn):
@@ -547,60 +547,6 @@ def extract_context_from_step_functions(event, lambda_context):
547547
return extract_context_from_lambda_context(lambda_context)
548548

549549

550-
def _extract_context_from_durable_checkpoint(operation):
551-
# Checkpoint data is written by the dd-trace-py in Datadog style
552-
# (x-datadog-* headers). Extraction goes through the standard
553-
# propagator.extract path, which honors DD_TRACE_PROPAGATION_STYLE_EXTRACT.
554-
# The default extract list (datadog, tracecontext, baggage) already
555-
# includes datadog. Customers who override the extract list MUST keep
556-
# datadog in it.
557-
if not isinstance(operation, dict):
558-
return None
559-
560-
step_details = operation.get("StepDetails")
561-
if not isinstance(step_details, dict):
562-
return None
563-
564-
result = step_details.get("Result")
565-
if isinstance(result, str):
566-
try:
567-
result = json.loads(result)
568-
except Exception:
569-
return None
570-
571-
if not isinstance(result, dict):
572-
return None
573-
574-
return propagator.extract(result)
575-
576-
577-
def extract_context_from_durable_execution(event):
578-
operations = event.get("InitialExecutionState", {}).get("Operations")
579-
if isinstance(operations, dict):
580-
operations = list(operations.values())
581-
if not isinstance(operations, list) or not operations:
582-
return None
583-
584-
highest = -1
585-
best_operation = None
586-
for operation in operations:
587-
if not isinstance(operation, dict):
588-
continue
589-
name = operation.get("Name")
590-
if not isinstance(name, str) or not name.startswith(_TRACE_CHECKPOINT_PREFIX):
591-
continue
592-
suffix = name[len(_TRACE_CHECKPOINT_PREFIX) :]
593-
try:
594-
number = int(suffix)
595-
except (TypeError, ValueError):
596-
continue
597-
if number > highest:
598-
highest = number
599-
best_operation = operation
600-
601-
return _extract_context_from_durable_checkpoint(best_operation)
602-
603-
604550
def extract_context_custom_extractor(extractor, event, lambda_context):
605551
"""
606552
Extract Datadog trace context using a custom trace extractor function

0 commit comments

Comments
 (0)