Skip to content

Commit a65fb4e

Browse files
authored
feat: add otel plugin (#386)
1 parent 3e16866 commit a65fb4e

14 files changed

Lines changed: 1162 additions & 11 deletions

File tree

packages/aws-durable-execution-sdk-python-examples/cli.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import os
77
import shutil
8+
import subprocess
89
import sys
910
import time
1011
import zipfile
@@ -35,6 +36,7 @@ def build_examples():
3536

3637
build_dir = Path(__file__).parent / "build"
3738
src_dir = Path(__file__).parent / "src"
39+
packages_dir = Path(__file__).parent.parent
3840

3941
logger.info("Building examples...")
4042

@@ -57,15 +59,29 @@ def build_examples():
5759
logger.exception("Failed to copy testing library")
5860
return False
5961

60-
# Copy SDK source from the main SDK package
61-
testing_src = (
62-
Path(__file__).parent.parent
63-
/ "aws-durable-execution-sdk-python"
64-
/ "src"
65-
/ "aws_durable_execution_sdk_python"
66-
)
67-
logger.info("Copying SDK from %s", testing_src)
68-
shutil.copytree(testing_src, build_dir / "aws_durable_execution_sdk_python")
62+
# Install local packages so their runtime dependencies are included in
63+
# the Lambda deployment package.
64+
runtime_packages = [
65+
packages_dir / "aws-durable-execution-sdk-python",
66+
packages_dir / "aws-durable-execution-sdk-python-otel",
67+
]
68+
try:
69+
subprocess.run(
70+
[
71+
sys.executable,
72+
"-m",
73+
"pip",
74+
"install",
75+
"--upgrade",
76+
"--target",
77+
str(build_dir),
78+
*[str(package) for package in runtime_packages],
79+
],
80+
check=True,
81+
)
82+
except subprocess.CalledProcessError:
83+
logger.exception("Failed to install runtime dependencies")
84+
return False
6985

7086
# Copy example functions
7187
logger.info("Copying examples from %s", src_dir)

packages/aws-durable-execution-sdk-python-examples/examples-catalog.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,17 @@
613613
"ExecutionTimeout": 300
614614
},
615615
"path": "./src/plugin/execution_with_plugin.py"
616+
},
617+
{
618+
"name": "Otel Plugin",
619+
"description": "Test Otel plugin",
620+
"handler": "execution_with_otel.handler",
621+
"integration": true,
622+
"durableConfig": {
623+
"RetentionPeriodInDays": 7,
624+
"ExecutionTimeout": 300
625+
},
626+
"path": "./src/plugin/execution_with_otel.py"
616627
}
617628
]
618629
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""Demonstrates handler execution without any durable operations."""
2+
3+
from typing import Any
4+
5+
from opentelemetry import trace
6+
7+
from aws_durable_execution_sdk_python import StepContext
8+
from aws_durable_execution_sdk_python.config import Duration, StepConfig, StepSemantics
9+
from aws_durable_execution_sdk_python.context import (
10+
DurableContext,
11+
durable_step,
12+
durable_with_child_context,
13+
)
14+
from aws_durable_execution_sdk_python.execution import durable_execution
15+
16+
from aws_durable_execution_sdk_python_otel import DurableExecutionOtelPlugin
17+
18+
19+
# use default provider
20+
tracer_provider = trace.get_tracer_provider()
21+
otel = DurableExecutionOtelPlugin(tracer_provider)
22+
23+
24+
@durable_step
25+
def add_numbers(_step_context: StepContext, a: int, b: int) -> int:
26+
return a + b
27+
28+
29+
@durable_with_child_context
30+
def add_numbers_in_child(child_context: DurableContext, a: int, b: int):
31+
result: int = child_context.step(
32+
add_numbers(a, b),
33+
name=f"step-{b}",
34+
)
35+
child_context.wait(
36+
Duration.from_seconds(1),
37+
name=f"wait-{b}",
38+
)
39+
return result
40+
41+
42+
@durable_execution(plugins=[otel])
43+
def handler(_event: Any, context: DurableContext) -> int:
44+
result = 0
45+
for i in range(3):
46+
result += context.run_in_child_context(
47+
add_numbers_in_child(6, i),
48+
name=f"context-{i}",
49+
)
50+
return context.step(
51+
add_numbers(result, 2),
52+
name="final-step",
53+
)

packages/aws-durable-execution-sdk-python-examples/template.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,24 @@
995995
"ExecutionTimeout": 300
996996
}
997997
}
998+
},
999+
"ExecutionWithOtel": {
1000+
"Type": "AWS::Serverless::Function",
1001+
"Properties": {
1002+
"CodeUri": "build/",
1003+
"Handler": "execution_with_otel.handler",
1004+
"Description": "Test Otel plugin",
1005+
"Role": {
1006+
"Fn::GetAtt": [
1007+
"DurableFunctionRole",
1008+
"Arn"
1009+
]
1010+
},
1011+
"DurableConfig": {
1012+
"RetentionPeriodInDays": 7,
1013+
"ExecutionTimeout": 300
1014+
}
1015+
}
9981016
}
9991017
}
10001018
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Tests for step example."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
6+
from src.plugin import execution_with_otel
7+
from test.conftest import deserialize_operation_payload
8+
9+
10+
@pytest.mark.example
11+
@pytest.mark.durable_execution(
12+
handler=execution_with_otel.handler,
13+
lambda_function_name="Otel Plugin",
14+
)
15+
def test_plugin(durable_runner):
16+
"""Test basic step example."""
17+
with durable_runner:
18+
result = durable_runner.run(input="{}", timeout=10)
19+
20+
assert result.status is InvocationStatus.SUCCEEDED
21+
assert deserialize_operation_payload(result.result) == 23
22+
23+
step_result = result.get_step("final-step")
24+
assert deserialize_operation_payload(step_result.result) == 23

packages/aws-durable-execution-sdk-python-otel/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ dependencies = [
2525
"aws-durable-execution-sdk-python>=1.5.0",
2626
"opentelemetry-api>=1.20.0",
2727
"opentelemetry-sdk>=1.20.0",
28+
"opentelemetry-exporter-otlp",
29+
"opentelemetry-propagator-aws-xray",
2830
]
2931

3032
[project.urls]
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,24 @@
11
"""OpenTelemetry instrumentation for AWS Lambda Durable Executions Python SDK."""
22

33
from aws_durable_execution_sdk_python_otel.__about__ import __version__
4+
from aws_durable_execution_sdk_python_otel.context_extractors import (
5+
ContextExtractor,
6+
w3c_client_context_extractor,
7+
xray_context_extractor,
8+
)
9+
from aws_durable_execution_sdk_python_otel.deterministic_id_generator import (
10+
DeterministicIdGenerator,
11+
)
12+
from aws_durable_execution_sdk_python_otel.plugin import (
13+
DurableExecutionOtelPlugin,
14+
)
415

516

617
__all__ = [
718
"__version__",
19+
"ContextExtractor",
20+
"DeterministicIdGenerator",
21+
"DurableExecutionOtelPlugin",
22+
"w3c_client_context_extractor",
23+
"xray_context_extractor",
824
]
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Context extractors for propagating trace context into durable executions."""
2+
3+
from __future__ import annotations
4+
5+
import os
6+
from typing import TYPE_CHECKING, Callable
7+
8+
from opentelemetry import context as otel_context, propagate
9+
10+
11+
if TYPE_CHECKING:
12+
from opentelemetry.context import Context
13+
14+
from aws_durable_execution_sdk_python.plugin import InvocationStartInfo
15+
16+
ContextExtractor = Callable[["InvocationStartInfo"], "Context"]
17+
18+
19+
def xray_context_extractor(info: "InvocationStartInfo") -> "Context":
20+
"""Read the X-Ray trace header from the _X_AMZN_TRACE_ID environment variable.
21+
22+
The durable execution backend propagates the same Root trace ID to every
23+
invocation, so all invocations share one traceId.
24+
"""
25+
trace_header = os.environ.get("_X_AMZN_TRACE_ID")
26+
if not trace_header:
27+
return otel_context.get_current()
28+
return propagate.extract(
29+
carrier={"X-Amzn-Trace-Id": trace_header},
30+
context=otel_context.get_current(),
31+
)
32+
33+
34+
def w3c_client_context_extractor(info: "InvocationStartInfo") -> "Context":
35+
"""Read W3C traceparent from context.clientContext.custom.traceparent.
36+
37+
Requires the backend clientContext propagation to be enabled.
38+
This extractor is a placeholder for when backend propagation is supported.
39+
"""
40+
return otel_context.get_current()
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
"""Deterministic ID generator for OpenTelemetry spans in durable executions."""
2+
3+
from __future__ import annotations
4+
5+
import hashlib
6+
import os
7+
import re
8+
from datetime import datetime, UTC
9+
10+
from opentelemetry.sdk.trace import IdGenerator, RandomIdGenerator
11+
12+
HASH_LENGTH = 16
13+
HASHED_ID_PATTERN = re.compile(r"^[0-9a-f]{16}$")
14+
15+
16+
def _parse_xray_root_trace_id(trace_header: str | None) -> str | None:
17+
"""Parse the Root trace ID from an X-Ray trace header string.
18+
19+
The header format is:
20+
Root=1-<8 hex>-<24 hex>;Parent=<16 hex>;Sampled=0|1
21+
22+
Returns the root value (e.g. "1-5759e988-bd862e3fe1be46a994272793")
23+
or None if the header is missing or malformed.
24+
"""
25+
if not trace_header:
26+
return None
27+
match = re.search(r"Root=(1-[0-9a-fA-F]{8}-[0-9a-fA-F]{24})", trace_header)
28+
return match.group(1) if match else None
29+
30+
31+
def _xray_trace_id_to_otel(xray_trace_id: str) -> int:
32+
"""Convert an X-Ray trace ID to the W3C/OpenTelemetry 32-char hex format.
33+
34+
X-Ray format: "1-<8hex>-<24hex>" (36 chars with prefix and dashes)
35+
OTel format: "<8hex><24hex>" (32 lowercase hex chars)
36+
"""
37+
otel_id = xray_trace_id.replace("1-", "", 1).replace("-", "").lower()
38+
return int(otel_id, 16)
39+
40+
41+
def _to_otel_trace_id(execution_arn: str, start_timestamp: datetime | None) -> int:
42+
"""Build an OTel-compatible trace ID (128 bits)
43+
44+
First attempts to read the trace ID from the _X_AMZN_TRACE_ID environment
45+
variable that Lambda populates on each invocation. This ties the durable
46+
execution spans to the same trace that X-Ray is already tracking.
47+
48+
Falls back to generating a deterministic trace ID from the execution ARN
49+
and timestamp when the environment variable is not set (e.g. in tests or
50+
non-Lambda environments).
51+
"""
52+
env_trace_id = _parse_xray_root_trace_id(os.environ.get("_X_AMZN_TRACE_ID"))
53+
if env_trace_id:
54+
return _xray_trace_id_to_otel(env_trace_id)
55+
56+
# Fallback: deterministic ID from execution ARN + timestamp
57+
time_part = format(int((start_timestamp or datetime.now(UTC)).timestamp()), "08x")
58+
hash_part = hashlib.blake2b(execution_arn.encode()).hexdigest()[:24] # noqa: S324
59+
return int(f"{time_part}{hash_part}", 16)
60+
61+
62+
def operation_id_to_span_id(operation_id: str) -> int:
63+
"""Derive a deterministic span ID (64 bits) from an operation ID."""
64+
hashed_operation_id = hashlib.blake2b(operation_id.encode()).hexdigest()[:16]
65+
return int(hashed_operation_id, 16)
66+
67+
68+
class DeterministicIdGenerator(IdGenerator):
69+
"""An ID generator that produces deterministic span IDs when a pending
70+
operation ID is set, and random IDs otherwise.
71+
72+
Trace IDs are deterministic when an execution ARN is set, ensuring all
73+
invocations of the same durable execution share a single trace.
74+
75+
Trace IDs embed a real timestamp so they satisfy the X-Ray format
76+
requirement (first 8 hex chars = Unix epoch seconds).
77+
"""
78+
79+
def __init__(self) -> None:
80+
self._next_span_id: int | None = None
81+
self._execution_trace_id: int | None = None
82+
self._random_id_generator = RandomIdGenerator()
83+
84+
def set_next_span_id(self, span_id: int | None) -> None:
85+
"""Set the operation ID to use for the next span's ID.
86+
87+
After one span is created, it resets to random.
88+
"""
89+
self._next_span_id = span_id
90+
91+
def set_trace_id(
92+
self, execution_arn: str, start_timestamp: datetime | None
93+
) -> None:
94+
"""Compute and cache the deterministic trace ID for this execution.
95+
96+
Args:
97+
execution_arn: The durable execution ARN (used for the hash portion).
98+
start_timestamp: start time of invocation
99+
"""
100+
self._execution_trace_id = _to_otel_trace_id(execution_arn, start_timestamp)
101+
102+
def generate_trace_id(self) -> int:
103+
"""Generate a 128-bit trace ID."""
104+
return self._execution_trace_id or self._random_id_generator.generate_trace_id()
105+
106+
def generate_span_id(self) -> int:
107+
"""Generate a 64-bit span ID."""
108+
span_id, self._next_span_id = self._next_span_id, None
109+
return span_id or self._random_id_generator.generate_span_id()

0 commit comments

Comments
 (0)