Skip to content

Commit 5f4e687

Browse files
authored
Merge branch 'main' into fix/remove-invoke-config-timeout
2 parents 5b00b47 + fbd48f1 commit 5f4e687

6 files changed

Lines changed: 260 additions & 34 deletions

File tree

packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/deterministic_id_generator.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
import hashlib
66
import os
77
import re
8-
from datetime import datetime, UTC
8+
from datetime import UTC, datetime
9+
10+
from opentelemetry.sdk.trace import IdGenerator, RandomIdGenerator
911

10-
from opentelemetry.sdk.trace import RandomIdGenerator
1112

12-
HASH_LENGTH = 16
1313
HASHED_ID_PATTERN = re.compile(r"^[0-9a-f]{16}$")
1414

1515

@@ -67,19 +67,25 @@ def operation_id_to_span_id(operation_id: str) -> int:
6767

6868
class DeterministicIdGenerator(RandomIdGenerator):
6969
"""An ID generator that produces deterministic span IDs when a pending
70-
operation ID is set, and random IDs otherwise.
70+
operation ID is set, and falls back to the provided generator otherwise.
7171
7272
Trace IDs are deterministic when an execution ARN is set, ensuring all
73-
invocations of the same durable execution share a single trace.
73+
invocations of the same durable execution share a single trace. When no
74+
deterministic ID is available, generation is delegated to the fallback
75+
generator (the tracer provider's original ID generator by default).
7476
7577
Trace IDs embed a real timestamp so they satisfy the X-Ray format
7678
requirement (first 8 hex chars = Unix epoch seconds).
79+
80+
Args:
81+
fallback_id_generator: Generator used when no deterministic ID is
82+
available. Defaults to a new ``RandomIdGenerator``.
7783
"""
7884

79-
def __init__(self) -> None:
85+
def __init__(self, fallback_id_generator: IdGenerator | None = None) -> None:
8086
self._next_span_id: int | None = None
8187
self._execution_trace_id: int | None = None
82-
self._random_id_generator = RandomIdGenerator()
88+
self._fallback_id_generator = fallback_id_generator or RandomIdGenerator()
8389

8490
def set_next_span_id(self, span_id: int | None) -> None:
8591
"""Set the operation ID to use for the next span's ID.
@@ -101,9 +107,11 @@ def set_trace_id(
101107

102108
def generate_trace_id(self) -> int:
103109
"""Generate a 128-bit trace ID."""
104-
return self._execution_trace_id or self._random_id_generator.generate_trace_id()
110+
return (
111+
self._execution_trace_id or self._fallback_id_generator.generate_trace_id()
112+
)
105113

106114
def generate_span_id(self) -> int:
107115
"""Generate a 64-bit span ID."""
108116
span_id, self._next_span_id = self._next_span_id, None
109-
return span_id or self._random_id_generator.generate_span_id()
117+
return span_id or self._fallback_id_generator.generate_span_id()

packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,31 @@
77
import threading
88
from typing import TYPE_CHECKING, Any
99

10-
from opentelemetry import trace, context
11-
from opentelemetry.context import Context
12-
from opentelemetry.sdk.trace import TracerProvider as SdkTracerProvider
13-
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
14-
from opentelemetry.trace import (
15-
Tracer,
16-
StatusCode,
17-
SpanContext,
18-
Span,
19-
Link,
20-
TraceFlags,
21-
)
22-
2310
from aws_durable_execution_sdk_python.lambda_service import OperationType
2411
from aws_durable_execution_sdk_python.plugin import (
2512
DurableInstrumentationPlugin,
2613
InvocationEndInfo,
2714
InvocationStartInfo,
2815
OperationEndInfo,
2916
OperationStartInfo,
30-
UserFunctionStartInfo,
3117
UserFunctionEndInfo,
3218
UserFunctionOutcome,
19+
UserFunctionStartInfo,
3320
)
21+
from opentelemetry import context, trace
22+
from opentelemetry.context import Context
23+
from opentelemetry.sdk.trace import TracerProvider
24+
from opentelemetry.sdk.trace import TracerProvider as SdkTracerProvider
25+
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
26+
from opentelemetry.trace import (
27+
Link,
28+
Span,
29+
SpanContext,
30+
StatusCode,
31+
TraceFlags,
32+
Tracer,
33+
)
34+
3435
from aws_durable_execution_sdk_python_otel.context_extractors import (
3536
ContextExtractor,
3637
xray_context_extractor,
@@ -40,6 +41,7 @@
4041
operation_id_to_span_id,
4142
)
4243

44+
4345
if TYPE_CHECKING:
4446
pass
4547

@@ -95,9 +97,13 @@ def __init__(
9597
context_extractor or xray_context_extractor
9698
)
9799

98-
self._id_generator: DeterministicIdGenerator = DeterministicIdGenerator()
99-
100100
self._provider = trace_provider
101+
# A ProxyTracerProvider (the API default from trace.get_tracer_provider()
102+
# before an SDK provider is configured) has no id_generator; fall back to
103+
# None so DeterministicIdGenerator uses its own default generator.
104+
self._id_generator: DeterministicIdGenerator = DeterministicIdGenerator(
105+
fallback_id_generator=getattr(self._provider, "id_generator", None)
106+
)
101107
self._provider.id_generator = self._id_generator
102108
self._provider.sampler = TraceIdRatioBased(sampling_rate)
103109
self._tracer: Tracer = self._provider.get_tracer(instrument_name)

packages/aws-durable-execution-sdk-python-otel/tests/test_deterministic_id_generator.py

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from datetime import UTC, datetime
66

7+
from opentelemetry.sdk.trace import IdGenerator, RandomIdGenerator
8+
79
from aws_durable_execution_sdk_python_otel.deterministic_id_generator import (
810
HASHED_ID_PATTERN,
911
DeterministicIdGenerator,
@@ -14,6 +16,20 @@
1416
)
1517

1618

19+
class _StubIdGenerator(IdGenerator):
20+
"""An IdGenerator that returns fixed, identifiable IDs."""
21+
22+
def __init__(self, trace_id: int, span_id: int) -> None:
23+
self._trace_id = trace_id
24+
self._span_id = span_id
25+
26+
def generate_trace_id(self) -> int:
27+
return self._trace_id
28+
29+
def generate_span_id(self) -> int:
30+
return self._span_id
31+
32+
1733
def test_parse_xray_root_trace_id_returns_root_from_header():
1834
"""Verify X-Ray Root trace ID parsing ignores other header fields."""
1935
trace_header = (
@@ -94,7 +110,7 @@ def test_deterministic_id_generator_falls_back_to_random_trace_id(monkeypatch):
94110
expected_trace_id = int("1" * 32, 16)
95111
generator = DeterministicIdGenerator()
96112
monkeypatch.setattr(
97-
generator._random_id_generator,
113+
generator._fallback_id_generator,
98114
"generate_trace_id",
99115
lambda: expected_trace_id,
100116
)
@@ -108,7 +124,7 @@ def test_deterministic_id_generator_uses_next_span_id_once(monkeypatch):
108124
random_span_id = int("3" * 16, 16)
109125
generator = DeterministicIdGenerator()
110126
monkeypatch.setattr(
111-
generator._random_id_generator,
127+
generator._fallback_id_generator,
112128
"generate_span_id",
113129
lambda: random_span_id,
114130
)
@@ -124,11 +140,66 @@ def test_deterministic_id_generator_accepts_cleared_next_span_id(monkeypatch):
124140
expected_span_id = int("4" * 16, 16)
125141
generator = DeterministicIdGenerator()
126142
monkeypatch.setattr(
127-
generator._random_id_generator,
143+
generator._fallback_id_generator,
128144
"generate_span_id",
129145
lambda: expected_span_id,
130146
)
131147

132148
generator.set_next_span_id(None)
133149

134150
assert generator.generate_span_id() == expected_span_id
151+
152+
153+
def test_deterministic_id_generator_defaults_to_random_fallback():
154+
"""Verify the fallback defaults to a RandomIdGenerator when none is given."""
155+
generator = DeterministicIdGenerator()
156+
157+
assert isinstance(generator._fallback_id_generator, RandomIdGenerator)
158+
159+
160+
def test_deterministic_id_generator_uses_provided_fallback_for_trace_id(monkeypatch):
161+
"""Verify the supplied fallback generator produces trace IDs when no
162+
execution trace ID is set."""
163+
monkeypatch.delenv("_X_AMZN_TRACE_ID", raising=False)
164+
fallback = _StubIdGenerator(trace_id=int("a" * 32, 16), span_id=int("b" * 16, 16))
165+
generator = DeterministicIdGenerator(fallback_id_generator=fallback)
166+
167+
assert generator.generate_trace_id() == int("a" * 32, 16)
168+
169+
170+
def test_deterministic_id_generator_uses_provided_fallback_for_span_id():
171+
"""Verify the supplied fallback generator produces span IDs when no
172+
deterministic span ID is pending."""
173+
fallback = _StubIdGenerator(trace_id=int("a" * 32, 16), span_id=int("b" * 16, 16))
174+
generator = DeterministicIdGenerator(fallback_id_generator=fallback)
175+
176+
assert generator.generate_span_id() == int("b" * 16, 16)
177+
178+
179+
def test_deterministic_id_generator_prefers_execution_trace_id_over_fallback(
180+
monkeypatch,
181+
):
182+
"""Verify a configured execution trace ID takes precedence over the fallback."""
183+
monkeypatch.delenv("_X_AMZN_TRACE_ID", raising=False)
184+
fallback = _StubIdGenerator(trace_id=int("a" * 32, 16), span_id=int("b" * 16, 16))
185+
generator = DeterministicIdGenerator(fallback_id_generator=fallback)
186+
187+
generator.set_trace_id(
188+
"arn:aws:lambda:us-west-2:123456789012:function:workflow:$LATEST",
189+
datetime(2024, 1, 2, 3, 4, 5, tzinfo=UTC),
190+
)
191+
192+
assert generator.generate_trace_id() == int("65937d253aa8c3f7ffe36c50d65b1a6d", 16)
193+
194+
195+
def test_deterministic_id_generator_prefers_next_span_id_over_fallback():
196+
"""Verify a pending deterministic span ID takes precedence over the fallback."""
197+
deterministic_span_id = int("c" * 16, 16)
198+
fallback = _StubIdGenerator(trace_id=int("a" * 32, 16), span_id=int("b" * 16, 16))
199+
generator = DeterministicIdGenerator(fallback_id_generator=fallback)
200+
201+
generator.set_next_span_id(deterministic_span_id)
202+
203+
assert generator.generate_span_id() == deterministic_span_id
204+
# Subsequent calls fall back to the provided generator.
205+
assert generator.generate_span_id() == int("b" * 16, 16)

packages/aws-durable-execution-sdk-python-otel/tests/test_plugin.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,6 @@
55
from concurrent.futures import ThreadPoolExecutor
66
from datetime import UTC, datetime
77

8-
from opentelemetry.context import Context
9-
from opentelemetry.sdk.trace import TracerProvider
10-
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
11-
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
12-
138
from aws_durable_execution_sdk_python.lambda_service import (
149
InvocationStatus,
1510
OperationStatus,
@@ -24,6 +19,11 @@
2419
UserFunctionOutcome,
2520
UserFunctionStartInfo,
2621
)
22+
from opentelemetry.context import Context
23+
from opentelemetry.sdk.trace import TracerProvider
24+
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
25+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
26+
2727
from aws_durable_execution_sdk_python_otel.deterministic_id_generator import (
2828
operation_id_to_span_id,
2929
)
@@ -131,7 +131,9 @@ def test_operation_end_without_start_emits_continuation_span_with_link():
131131
plugin.on_invocation_start(_invocation_start_info())
132132
operation_id = "wait-existing"
133133
random_span_id = int("1234567890abcdef", 16)
134-
plugin._id_generator._random_id_generator.generate_span_id = lambda: random_span_id
134+
plugin._id_generator._fallback_id_generator.generate_span_id = lambda: (
135+
random_span_id
136+
)
135137

136138
plugin.on_operation_end(
137139
OperationEndInfo(

packages/aws-durable-execution-sdk-python/src/aws_durable_execution_sdk_python/retries.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,34 @@ def retry_strategy(error: Exception, attempts_made: int) -> RetryDecision:
125125
return retry_strategy
126126

127127

128+
def create_linear_retry_strategy(
129+
max_attempts: int = 6,
130+
initial_delay: Duration | None = None,
131+
increment: Duration | None = None,
132+
) -> Callable[[Exception, int], RetryDecision]:
133+
"""Linearly increasing delay between retries: initial + increment * (attempts_made - 1).
134+
135+
Mirrors the JS SDK's ``createLinearRetryStrategy``. With the defaults this
136+
yields delays of 1s, 2s, 3s, 4s, 5s. No jitter is applied and there is no
137+
upper cap on the delay; callers who need either can build their own
138+
strategy via ``create_retry_strategy``.
139+
"""
140+
initial: Duration = (
141+
initial_delay if initial_delay is not None else Duration.from_seconds(1)
142+
)
143+
step: Duration = increment if increment is not None else Duration.from_seconds(1)
144+
145+
def linear_retry_strategy(_error: Exception, attempts_made: int) -> RetryDecision:
146+
if attempts_made >= max_attempts:
147+
return RetryDecision.no_retry()
148+
delay_seconds: int = initial.to_seconds() + step.to_seconds() * (
149+
attempts_made - 1
150+
)
151+
return RetryDecision.retry(Duration(seconds=delay_seconds))
152+
153+
return linear_retry_strategy
154+
155+
128156
class RetryPresets:
129157
"""Default retry presets."""
130158

@@ -180,6 +208,31 @@ def critical(cls) -> Callable[[Exception, int], RetryDecision]:
180208
)
181209
)
182210

211+
@classmethod
212+
def linear(cls) -> Callable[[Exception, int], RetryDecision]:
213+
"""Linearly increasing delay between retries: 1s, 2s, 3s, 4s, 5s."""
214+
return create_linear_retry_strategy(
215+
max_attempts=6,
216+
initial_delay=Duration.from_seconds(1),
217+
increment=Duration.from_seconds(1),
218+
)
219+
220+
@classmethod
221+
def fixed(
222+
cls, interval: Duration | None = None
223+
) -> Callable[[Exception, int], RetryDecision]:
224+
"""Constant delay between retries (5s by default, no jitter)."""
225+
delay: Duration = interval if interval is not None else Duration.from_seconds(5)
226+
return create_retry_strategy(
227+
RetryStrategyConfig(
228+
max_attempts=5,
229+
initial_delay=delay,
230+
max_delay=delay,
231+
backoff_rate=1,
232+
jitter_strategy=JitterStrategy.NONE,
233+
)
234+
)
235+
183236

184237
@dataclass(frozen=True)
185238
class WithRetryConfig(Generic[T]):

0 commit comments

Comments
 (0)