Skip to content

Commit a36c815

Browse files
committed
initial implementation
1 parent d65d133 commit a36c815

10 files changed

Lines changed: 668 additions & 11 deletions

File tree

examples/cli.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,25 +44,34 @@ def build_examples():
4444
shutil.rmtree(build_dir)
4545
build_dir.mkdir()
4646

47+
# install all dependencies
48+
os.system("pip install -t ./examples/build .")
49+
4750
# Copy testing library from current environment
4851
try:
4952
import aws_durable_execution_sdk_python_testing
5053

51-
sdk_path = Path(aws_durable_execution_sdk_python_testing.__file__).parent
52-
logger.info("Copying SDK from %s", sdk_path)
54+
testing_path = Path(aws_durable_execution_sdk_python_testing.__file__).parent
55+
logger.info("Copying testing library from %s", testing_path)
56+
shutil.rmtree(
57+
build_dir / "aws_durable_execution_sdk_python_testing", ignore_errors=True
58+
)
5359
shutil.copytree(
54-
sdk_path, build_dir / "aws_durable_execution_sdk_python_testing"
60+
testing_path, build_dir / "aws_durable_execution_sdk_python_testing"
5561
)
5662
except (ImportError, OSError):
5763
logger.exception("Failed to copy testing library")
5864
return False
5965

6066
# Copy testing SDK source
61-
testing_src = (
62-
Path(__file__).parent.parent / "src" / "aws_durable_execution_sdk_python"
63-
)
64-
logger.info("Copying SDK from %s", testing_src)
65-
shutil.copytree(testing_src, build_dir / "aws_durable_execution_sdk_python")
67+
for subdir in [
68+
"aws_durable_execution_sdk_python",
69+
"aws_durable_execution_sdk_python_otel",
70+
]:
71+
sdk_src = Path(__file__).parent.parent / "src" / subdir
72+
logger.info("Copying SDK folder from %s", sdk_src)
73+
shutil.rmtree(build_dir / subdir, ignore_errors=True)
74+
shutil.copytree(sdk_src, build_dir / subdir)
6675

6776
# Copy example functions
6877
logger.info("Copying examples from %s", src_dir)
@@ -328,7 +337,11 @@ def deploy_function(example_name: str, function_name: str | None = None):
328337
"Timeout": 60,
329338
"MemorySize": 128,
330339
"Environment": {
331-
"Variables": {"AWS_ENDPOINT_URL_LAMBDA": config["lambda_endpoint"]}
340+
"Variables": {
341+
"AWS_ENDPOINT_URL_LAMBDA": config["lambda_endpoint"],
342+
"OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED": "true",
343+
"AWS_LAMBDA_EXEC_WRAPPER": "/opt/otel-instrument",
344+
}
332345
},
333346
"DurableConfig": example_config["durableConfig"],
334347
"LoggingConfig": example_config.get("loggingConfig", {}),

examples/examples-catalog.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,17 @@
591591
"ExecutionTimeout": 300
592592
},
593593
"path": "./src/plugin/execution_with_plugin.py"
594+
},
595+
{
596+
"name": "Otel Plugin",
597+
"description": "Test Otel plugin",
598+
"handler": "execution_with_otel.handler",
599+
"integration": true,
600+
"durableConfig": {
601+
"RetentionPeriodInDays": 7,
602+
"ExecutionTimeout": 300
603+
},
604+
"path": "./src/plugin/execution_with_otel.py"
594605
}
595606
]
596607
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Demonstrates handler execution without any durable operations."""
2+
3+
import logging
4+
from typing import Any
5+
6+
from aws_durable_execution_sdk_python import StepContext
7+
from aws_durable_execution_sdk_python.context import (
8+
DurableContext,
9+
durable_step,
10+
durable_with_child_context,
11+
)
12+
from aws_durable_execution_sdk_python.execution import durable_execution
13+
14+
from aws_durable_execution_sdk_python_otel import DurableExecutionOtelPlugin
15+
16+
17+
otel = DurableExecutionOtelPlugin()
18+
19+
20+
@durable_step
21+
def add_numbers(_step_context: StepContext, a: int, b: int) -> int:
22+
return a + b
23+
24+
25+
@durable_with_child_context
26+
def add_numbers_in_child(child_context: DurableContext, a: int, b: int):
27+
result: int = child_context.step(
28+
add_numbers(a, b),
29+
name="add-a-and-b",
30+
)
31+
return result
32+
33+
34+
@durable_execution(plugins=[otel])
35+
def handler(_event: Any, context: DurableContext) -> int:
36+
result: int = context.run_in_child_context(
37+
add_numbers_in_child(6, 4),
38+
name="add-6-and-4",
39+
)
40+
return context.step(
41+
add_numbers(result, 2),
42+
name="add-result-to-2",
43+
)

examples/template.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,24 @@
959959
"ExecutionTimeout": 300
960960
}
961961
}
962+
},
963+
"ExecutionWithOtel": {
964+
"Type": "AWS::Serverless::Function",
965+
"Properties": {
966+
"CodeUri": "build/",
967+
"Handler": "execution_with_otel.handler",
968+
"Description": "Test Otel plugin",
969+
"Role": {
970+
"Fn::GetAtt": [
971+
"DurableFunctionRole",
972+
"Arn"
973+
]
974+
},
975+
"DurableConfig": {
976+
"RetentionPeriodInDays": 7,
977+
"ExecutionTimeout": 300
978+
}
979+
}
962980
}
963981
}
964982
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Tests for step example."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
6+
from src.plugin import execution_with_otel
7+
from test.conftest import deserialize_operation_payload
8+
9+
10+
@pytest.mark.example
11+
@pytest.mark.durable_execution(
12+
handler=execution_with_otel.handler,
13+
lambda_function_name="Otel Plugin",
14+
)
15+
def test_plugin(durable_runner):
16+
"""Test basic step example."""
17+
with durable_runner:
18+
result = durable_runner.run(input="{}", timeout=10)
19+
20+
assert result.status is InvocationStatus.SUCCEEDED
21+
assert deserialize_operation_payload(result.result) == 12
22+
23+
step_result = result.get_step("add-result-to-2")
24+
assert deserialize_operation_payload(step_result.result) == 12

pyproject.toml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,24 @@ classifiers = [
2121
"Programming Language :: Python :: Implementation :: CPython",
2222
"Programming Language :: Python :: Implementation :: PyPy",
2323
]
24-
dependencies = ["boto3>=1.42.1"]
24+
dependencies = [
25+
"boto3>=1.42.1",
26+
"opentelemetry-api",
27+
"opentelemetry-sdk",
28+
"opentelemetry-exporter-otlp",
29+
"opentelemetry-propagator-aws-xray",
30+
]
2531

2632
[project.urls]
2733
Documentation = "https://github.com/aws/aws-durable-execution-sdk-python#readme"
2834
Issues = "https://github.com/aws/aws-durable-execution-sdk-python/issues"
2935
Source = "https://github.com/aws/aws-durable-execution-sdk-python"
3036

3137
[tool.hatch.build.targets.wheel]
32-
packages = ["src/aws_durable_execution_sdk_python"]
38+
packages = [
39+
"src/aws_durable_execution_sdk_python",
40+
"src/aws_durable_execution_sdk_python_otel",
41+
]
3342

3443
[tool.hatch.version]
3544
path = "src/aws_durable_execution_sdk_python/__about__.py"
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
"""OpenTelemetry instrumentation for AWS Durable Execution SDK."""
2+
3+
from aws_durable_execution_sdk_python_otel.context_extractors import (
4+
ContextExtractor,
5+
w3c_client_context_extractor,
6+
xray_context_extractor,
7+
)
8+
from aws_durable_execution_sdk_python_otel.deterministic_id_generator import (
9+
DeterministicIdGenerator,
10+
)
11+
from aws_durable_execution_sdk_python_otel.plugin import (
12+
DurableExecutionOtelPlugin,
13+
)
14+
15+
16+
__all__ = [
17+
"ContextExtractor",
18+
"DeterministicIdGenerator",
19+
"DurableExecutionOtelPlugin",
20+
"w3c_client_context_extractor",
21+
"xray_context_extractor",
22+
]
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Context extractors for propagating trace context into durable executions."""
2+
3+
from __future__ import annotations
4+
5+
import os
6+
from typing import TYPE_CHECKING, Callable
7+
8+
from opentelemetry import context as otel_context, propagate
9+
10+
11+
if TYPE_CHECKING:
12+
from opentelemetry.context import Context
13+
14+
from aws_durable_execution_sdk_python.plugin import InvocationStartInfo
15+
16+
ContextExtractor = Callable[["InvocationStartInfo"], "Context"]
17+
18+
19+
def xray_context_extractor(info: "InvocationStartInfo") -> "Context":
20+
"""Read the X-Ray trace header from the _X_AMZN_TRACE_ID environment variable.
21+
22+
The durable execution backend propagates the same Root trace ID to every
23+
invocation, so all invocations share one traceId.
24+
"""
25+
trace_header = os.environ.get("_X_AMZN_TRACE_ID")
26+
if not trace_header:
27+
return otel_context.get_current()
28+
return propagate.extract(
29+
carrier={"X-Amzn-Trace-Id": trace_header},
30+
context=otel_context.get_current(),
31+
)
32+
33+
34+
def w3c_client_context_extractor(info: "InvocationStartInfo") -> "Context":
35+
"""Read W3C traceparent from context.clientContext.custom.traceparent.
36+
37+
Requires the backend clientContext propagation to be enabled.
38+
This extractor is a placeholder for when backend propagation is supported.
39+
"""
40+
return otel_context.get_current()
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""Deterministic ID generator for OpenTelemetry spans in durable executions."""
2+
3+
from __future__ import annotations
4+
5+
import hashlib
6+
import os
7+
import re
8+
from datetime import datetime, UTC
9+
10+
from opentelemetry.sdk.trace import IdGenerator, RandomIdGenerator
11+
12+
HASH_LENGTH = 16
13+
HASHED_ID_PATTERN = re.compile(r"^[0-9a-f]{16}$")
14+
15+
16+
def hash_id(input_str: str) -> str:
17+
"""Create an MD5 hash of the input string, truncated to 16 hex chars.
18+
19+
This matches the JS SDK's hashId function used for operation IDs.
20+
"""
21+
return hashlib.md5(input_str.encode()).hexdigest()[:HASH_LENGTH] # noqa: S324
22+
23+
24+
def _parse_xray_root_trace_id(trace_header: str | None) -> str | None:
25+
"""Parse the Root trace ID from an X-Ray trace header string.
26+
27+
The header format is:
28+
Root=1-<8 hex>-<24 hex>;Parent=<16 hex>;Sampled=0|1
29+
30+
Returns the root value (e.g. "1-5759e988-bd862e3fe1be46a994272793")
31+
or None if the header is missing or malformed.
32+
"""
33+
if not trace_header:
34+
return None
35+
match = re.search(r"Root=(1-[0-9a-fA-F]{8}-[0-9a-fA-F]{24})", trace_header)
36+
return match.group(1) if match else None
37+
38+
39+
def _xray_trace_id_to_otel(xray_trace_id: str) -> int:
40+
"""Convert an X-Ray trace ID to the W3C/OpenTelemetry 32-char hex format.
41+
42+
X-Ray format: "1-<8hex>-<24hex>" (36 chars with prefix and dashes)
43+
OTel format: "<8hex><24hex>" (32 lowercase hex chars)
44+
"""
45+
otel_id = xray_trace_id.replace("1-", "", 1).replace("-", "").lower()
46+
return int(otel_id, 16)
47+
48+
49+
def _to_otel_trace_id(execution_arn: str, start_timestamp: datetime | None) -> int:
50+
"""Build an OTel-compatible trace ID (128 bits)
51+
52+
First attempts to read the trace ID from the _X_AMZN_TRACE_ID environment
53+
variable that Lambda populates on each invocation. This ties the durable
54+
execution spans to the same trace that X-Ray is already tracking.
55+
56+
Falls back to generating a deterministic trace ID from the execution ARN
57+
and timestamp when the environment variable is not set (e.g. in tests or
58+
non-Lambda environments).
59+
"""
60+
env_trace_id = _parse_xray_root_trace_id(os.environ.get("_X_AMZN_TRACE_ID"))
61+
if env_trace_id:
62+
return _xray_trace_id_to_otel(env_trace_id)
63+
64+
# Fallback: deterministic ID from execution ARN + timestamp
65+
time_part = format(int((start_timestamp or datetime.now(UTC)).timestamp()), "08x")
66+
hash_part = hashlib.blake2b(execution_arn.encode()).hexdigest()[:24] # noqa: S324
67+
return int(f"{time_part}{hash_part}", 16)
68+
69+
70+
def operation_id_to_span_id(operation_id: str) -> int:
71+
"""Derive a deterministic span ID (64 bits) from an operation ID."""
72+
hashed_operation_id = hashlib.blake2b(operation_id.encode()).hexdigest()[:16]
73+
return int(hashed_operation_id, 16)
74+
75+
76+
class DeterministicIdGenerator(IdGenerator):
77+
"""An ID generator that produces deterministic span IDs when a pending
78+
operation ID is set, and random IDs otherwise.
79+
80+
Trace IDs are deterministic when an execution ARN is set, ensuring all
81+
invocations of the same durable execution share a single trace.
82+
83+
Trace IDs embed a real timestamp so they satisfy the X-Ray format
84+
requirement (first 8 hex chars = Unix epoch seconds).
85+
"""
86+
87+
def __init__(self) -> None:
88+
self._next_span_id: int | None = None
89+
self._execution_trace_id: int | None = None
90+
self._random_id_generator = RandomIdGenerator()
91+
92+
def set_next_span_id(self, operation_id: str | None) -> None:
93+
"""Set the operation ID to use for the next span's ID.
94+
95+
After one span is created, it resets to random.
96+
"""
97+
self._next_span_id = (
98+
operation_id_to_span_id(operation_id) if operation_id else None
99+
)
100+
101+
def set_trace_id(
102+
self, execution_arn: str, start_timestamp: datetime | None
103+
) -> None:
104+
"""Compute and cache the deterministic trace ID for this execution.
105+
106+
Args:
107+
execution_arn: The durable execution ARN (used for the hash portion).
108+
start_timestamp: start time of invocation
109+
"""
110+
self._execution_trace_id = _to_otel_trace_id(execution_arn, start_timestamp)
111+
112+
def generate_trace_id(self) -> int:
113+
"""Generate a 128-bit trace ID."""
114+
if self._execution_trace_id:
115+
return self._execution_trace_id
116+
return self._random_id_generator.generate_trace_id()
117+
118+
def generate_span_id(self) -> int:
119+
"""Generate a 64-bit span ID."""
120+
if self._next_span_id:
121+
span_id = self._next_span_id
122+
self._next_span_id = None
123+
return span_id
124+
return self._random_id_generator.generate_span_id()

0 commit comments

Comments
 (0)