|
4 | 4 | # Copyright 2019 Datadog, Inc. |
5 | 5 | import logging |
6 | 6 | import re |
| 7 | +import ujson as json |
7 | 8 |
|
8 | 9 | logger = logging.getLogger(__name__) |
9 | 10 |
|
| 11 | +_TRACE_CHECKPOINT_PREFIX = "_datadog_" |
| 12 | + |
10 | 13 |
|
11 | 14 | def _parse_durable_execution_arn(arn): |
12 | 15 | """ |
@@ -56,6 +59,62 @@ def extract_durable_function_tags(event): |
56 | 59 | VALID_DURABLE_STATUSES = {"SUCCEEDED", "FAILED", "PENDING"} |
57 | 60 |
|
58 | 61 |
|
| 62 | +def _extract_context_from_durable_checkpoint(operation): |
| 63 | + # Checkpoint data is written by the dd-trace-py in Datadog style |
| 64 | + # (x-datadog-* headers). Extraction goes through the standard |
| 65 | + # propagator.extract path, which honors DD_TRACE_PROPAGATION_STYLE_EXTRACT. |
| 66 | + # The default extract list (datadog, tracecontext, baggage) already |
| 67 | + # includes datadog. Customers who override the extract list MUST keep |
| 68 | + # datadog in it. |
| 69 | + if not isinstance(operation, dict): |
| 70 | + return None |
| 71 | + |
| 72 | + step_details = operation.get("StepDetails") |
| 73 | + if not isinstance(step_details, dict): |
| 74 | + return None |
| 75 | + |
| 76 | + result = step_details.get("Result") |
| 77 | + if isinstance(result, str): |
| 78 | + try: |
| 79 | + result = json.loads(result) |
| 80 | + except Exception: |
| 81 | + return None |
| 82 | + |
| 83 | + if not isinstance(result, dict): |
| 84 | + return None |
| 85 | + |
| 86 | + from datadog_lambda.tracing import propagator |
| 87 | + |
| 88 | + return propagator.extract(result) |
| 89 | + |
| 90 | + |
| 91 | +def extract_context_from_durable_execution(event): |
| 92 | + operations = event.get("InitialExecutionState", {}).get("Operations") |
| 93 | + if isinstance(operations, dict): |
| 94 | + operations = list(operations.values()) |
| 95 | + if not isinstance(operations, list) or not operations: |
| 96 | + return None |
| 97 | + |
| 98 | + highest = -1 |
| 99 | + best_operation = None |
| 100 | + for operation in operations: |
| 101 | + if not isinstance(operation, dict): |
| 102 | + continue |
| 103 | + name = operation.get("Name") |
| 104 | + if not isinstance(name, str) or not name.startswith(_TRACE_CHECKPOINT_PREFIX): |
| 105 | + continue |
| 106 | + suffix = name[len(_TRACE_CHECKPOINT_PREFIX) :] |
| 107 | + try: |
| 108 | + number = int(suffix) |
| 109 | + except (TypeError, ValueError): |
| 110 | + continue |
| 111 | + if number > highest: |
| 112 | + highest = number |
| 113 | + best_operation = operation |
| 114 | + |
| 115 | + return _extract_context_from_durable_checkpoint(best_operation) |
| 116 | + |
| 117 | + |
59 | 118 | def extract_durable_execution_status(response, event): |
60 | 119 | if not isinstance(event, dict) or "DurableExecutionArn" not in event: |
61 | 120 | return None |
|
0 commit comments