Skip to content

Commit 8225fda

Browse files
Alex Wangwangyb-A
authored andcommitted
feat: add otel logger injection
1 parent fbd48f1 commit 8225fda

13 files changed

Lines changed: 907 additions & 19 deletions

File tree

packages/aws-durable-execution-sdk-python-examples/examples-catalog.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,21 @@
635635
"ExecutionTimeout": 300
636636
},
637637
"path": "./src/plugin/execution_with_otel.py"
638+
},
639+
{
640+
"name": "Otel Logger Example",
641+
"description": "Demonstrates OTel-enriched logging correlated to durable spans",
642+
"handler": "otel_logger_example.handler",
643+
"integration": true,
644+
"durableConfig": {
645+
"RetentionPeriodInDays": 7,
646+
"ExecutionTimeout": 300
647+
},
648+
"loggingConfig": {
649+
"ApplicationLogLevel": "INFO",
650+
"LogFormat": "JSON"
651+
},
652+
"path": "./src/otel/otel_logger_example.py"
638653
}
639654
]
640655
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""Demonstrates OTel-enriched logging in a durable execution.
2+
3+
The DurableExecutionOtelPlugin wraps the execution logger (enrich_logger=True
4+
by default) so every log line emitted through context.logger / step_context.logger
5+
is automatically enriched with the active OpenTelemetry trace context
6+
(otel.trace_id, otel.span_id, otel.trace_sampled). This lets logs correlate to
7+
the spans the plugin emits without any user code changes.
8+
9+
Logs emitted:
10+
- at the top level correlate to the invocation span
11+
- inside a step correlate to that step's span
12+
- inside a child context correlate to the child-context span
13+
"""
14+
15+
from typing import Any
16+
17+
from aws_durable_execution_sdk_python_otel import DurableExecutionOtelPlugin
18+
from opentelemetry import trace
19+
from opentelemetry.sdk.trace import TracerProvider
20+
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
21+
22+
from aws_durable_execution_sdk_python import StepContext
23+
from aws_durable_execution_sdk_python.context import (
24+
DurableContext,
25+
durable_step,
26+
durable_with_child_context,
27+
)
28+
from aws_durable_execution_sdk_python.execution import durable_execution
29+
30+
31+
tracer_provider = TracerProvider()
32+
trace.set_tracer_provider(tracer_provider)
33+
34+
# enrich_logger defaults to True, so the execution logger is wrapped with OTel
35+
# trace context injection (otel.trace_id, otel.span_id, otel.trace_sampled).
36+
otel = DurableExecutionOtelPlugin(tracer_provider)
37+
38+
39+
@durable_step
40+
def greet(step_context: StepContext, name: str) -> str:
41+
# Logged inside a step: enriched with this step's span_id.
42+
# Note: avoid reserved LogRecord keys (e.g. "name") in extra.
43+
step_context.logger.info("Greeting inside step", extra={"greeting_name": name})
44+
return f"hello {name}"
45+
46+
47+
@durable_with_child_context
48+
def greet_in_child(child_context: DurableContext, name: str) -> str:
49+
# Logged inside a child context: enriched with the child-context span_id.
50+
child_context.logger.info("Entering child context")
51+
result: str = child_context.step(greet(name), name="child-greet")
52+
child_context.logger.info("Leaving child context", extra={"result": result})
53+
return result
54+
55+
56+
@durable_execution(plugins=[otel])
57+
def handler(_event: Any, context: DurableContext) -> str:
58+
# Logged at the top level: enriched with the invocation span_id.
59+
context.logger.info("Workflow started")
60+
61+
top: str = context.step(greet("world"), name="top-greet")
62+
nested: str = context.run_in_child_context(
63+
greet_in_child("nested"), name="child-context"
64+
)
65+
66+
context.logger.info("Workflow completed", extra={"top": top, "nested": nested})
67+
return f"{top} | {nested}"

packages/aws-durable-execution-sdk-python-examples/template.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,24 @@
10311031
"ExecutionTimeout": 300
10321032
}
10331033
}
1034+
},
1035+
"OtelLoggerExample": {
1036+
"Type": "AWS::Serverless::Function",
1037+
"Properties": {
1038+
"CodeUri": "build/",
1039+
"Handler": "otel_logger_example.handler",
1040+
"Description": "Demonstrates OTel-enriched logging correlated to durable spans",
1041+
"Role": {
1042+
"Fn::GetAtt": [
1043+
"DurableFunctionRole",
1044+
"Arn"
1045+
]
1046+
},
1047+
"DurableConfig": {
1048+
"RetentionPeriodInDays": 7,
1049+
"ExecutionTimeout": 300
1050+
}
1051+
}
10341052
}
10351053
}
10361054
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""Tests for the OTel-enriched logger example."""
2+
3+
import pytest
4+
5+
from aws_durable_execution_sdk_python.execution import InvocationStatus
6+
from aws_durable_execution_sdk_python.lambda_service import OperationType
7+
from src.otel import otel_logger_example
8+
from test.conftest import deserialize_operation_payload
9+
10+
11+
@pytest.mark.example
12+
@pytest.mark.durable_execution(
13+
handler=otel_logger_example.handler,
14+
lambda_function_name="Otel Logger Example",
15+
)
16+
def test_otel_logger_example(durable_runner):
17+
"""Verify the OTel logger example runs and produces the expected result."""
18+
with durable_runner:
19+
result = durable_runner.run(input="{}", timeout=10)
20+
21+
assert result.status is InvocationStatus.SUCCEEDED
22+
assert deserialize_operation_payload(result.result) == "hello world | hello nested"
23+
24+
# The top-level step is named "top-greet".
25+
top_step = result.get_step("top-greet")
26+
assert deserialize_operation_payload(top_step.result) == "hello world"
27+
28+
# The child context wraps a nested step, so a CONTEXT operation exists.
29+
context_ops = [
30+
op for op in result.operations if op.operation_type is OperationType.CONTEXT
31+
]
32+
assert len(context_ops) >= 1

packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from aws_durable_execution_sdk_python_otel.deterministic_id_generator import (
1010
DeterministicIdGenerator,
1111
)
12+
from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger
1213
from aws_durable_execution_sdk_python_otel.plugin import (
1314
DurableExecutionOtelPlugin,
1415
)
@@ -19,6 +20,7 @@
1920
"ContextExtractor",
2021
"DeterministicIdGenerator",
2122
"DurableExecutionOtelPlugin",
23+
"OtelEnrichedLogger",
2224
"w3c_client_context_extractor",
2325
"xray_context_extractor",
2426
]
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""OTel-enriched logger for durable executions.
2+
3+
Provides a LoggerInterface wrapper that injects OpenTelemetry trace context
4+
(trace_id, span_id, trace_sampled) into every log message's extra dict. This
5+
enables log-trace correlation in observability backends without changing user code.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
from collections.abc import Mapping
11+
from typing import TYPE_CHECKING
12+
13+
from opentelemetry.trace import TraceFlags
14+
15+
16+
if TYPE_CHECKING:
17+
from aws_durable_execution_sdk_python.types import LoggerInterface
18+
19+
from aws_durable_execution_sdk_python_otel.plugin import DurableExecutionOtelPlugin
20+
21+
22+
class OtelEnrichedLogger:
23+
"""LoggerInterface wrapper that injects OTel trace context into log extra fields.
24+
25+
The span context is resolved by the plugin via get_current_span_context(),
26+
which returns the active operation span inside steps and the invocation span
27+
for top-level handler code.
28+
29+
Injected fields:
30+
- otel.trace_id: 32-char hex trace identifier
31+
- otel.span_id: 16-char hex span identifier
32+
- otel.trace_sampled: boolean indicating if the trace is sampled
33+
34+
Args:
35+
inner: The underlying logger to delegate to after enrichment.
36+
plugin: The OTel plugin instance that resolves the current span context.
37+
"""
38+
39+
def __init__(
40+
self, inner: LoggerInterface, plugin: DurableExecutionOtelPlugin
41+
) -> None:
42+
self._inner = inner
43+
self._plugin = plugin
44+
45+
def debug(
46+
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
47+
) -> None:
48+
self._inner.debug(msg, *args, extra=self._enrich(extra))
49+
50+
def info(
51+
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
52+
) -> None:
53+
self._inner.info(msg, *args, extra=self._enrich(extra))
54+
55+
def warning(
56+
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
57+
) -> None:
58+
self._inner.warning(msg, *args, extra=self._enrich(extra))
59+
60+
def error(
61+
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
62+
) -> None:
63+
self._inner.error(msg, *args, extra=self._enrich(extra))
64+
65+
def exception(
66+
self, msg: object, *args: object, extra: Mapping[str, object] | None = None
67+
) -> None:
68+
self._inner.exception(msg, *args, extra=self._enrich(extra))
69+
70+
def _enrich(self, extra: Mapping[str, object] | None) -> dict[str, object]:
71+
"""Inject OTel trace context into the extra dict.
72+
73+
trace_id, span_id, and trace_sampled come from the span context resolved
74+
by the plugin, so the values always match the exported spans.
75+
"""
76+
enriched: dict[str, object] = dict(extra) if extra else {}
77+
78+
span_context = self._plugin.get_current_span_context()
79+
if span_context and span_context.is_valid:
80+
enriched["otel.trace_id"] = format(span_context.trace_id, "032x")
81+
enriched["otel.span_id"] = format(span_context.span_id, "016x")
82+
enriched["otel.trace_sampled"] = bool(
83+
span_context.trace_flags & TraceFlags.SAMPLED
84+
)
85+
86+
return enriched

packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
)
2121
from opentelemetry import context, trace
2222
from opentelemetry.context import Context
23-
from opentelemetry.sdk.trace import TracerProvider
2423
from opentelemetry.sdk.trace import TracerProvider as SdkTracerProvider
2524
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
2625
from opentelemetry.trace import (
@@ -40,10 +39,11 @@
4039
DeterministicIdGenerator,
4140
operation_id_to_span_id,
4241
)
42+
from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger
4343

4444

4545
if TYPE_CHECKING:
46-
pass
46+
from aws_durable_execution_sdk_python.types import LoggerInterface
4747

4848

4949
logger = logging.getLogger(__name__)
@@ -86,13 +86,15 @@ def __init__(
8686
context_extractor: ContextExtractor | None = None,
8787
sampling_rate: float = 1.0,
8888
instrument_name: str = DEFAULT_INSTRUMENT_NAME,
89+
enrich_logger: bool = True,
8990
) -> None:
9091
"""Initialize the plugin with an OpenTelemetry tracer provider.
9192
9293
The provided tracer provider is configured with this plugin's
9394
deterministic ID generator and sampling strategy so spans for a durable
9495
execution share stable trace and logical operation identifiers.
9596
"""
97+
self._enrich_logger = enrich_logger
9698
self._context_extractor: ContextExtractor = (
9799
context_extractor or xray_context_extractor
98100
)
@@ -115,6 +117,24 @@ def __init__(
115117
self._operation_spans: dict[str | None, Span] = {}
116118
self._operation_spans_lock = threading.RLock()
117119

120+
def wrap_logger(self, logger: LoggerInterface) -> LoggerInterface | None:
121+
"""Wrap the execution logger to inject OTel trace context.
122+
123+
When enrich_logger is enabled (default), returns an OtelEnrichedLogger
124+
that adds trace_id, span_id, and trace_sampled to every log message.
125+
Idempotent: returns None if the logger is already an OtelEnrichedLogger.
126+
127+
Args:
128+
logger: The current logger interface from the execution context.
129+
130+
Returns:
131+
An OtelEnrichedLogger wrapping the input, or None if disabled or
132+
already wrapped.
133+
"""
134+
if not self._enrich_logger or isinstance(logger, OtelEnrichedLogger):
135+
return None
136+
return OtelEnrichedLogger(inner=logger, plugin=self)
137+
118138
def _set_span(self, operation_id: str | None, span: Span) -> None:
119139
"""Register the active span for an operation ID."""
120140
with self._operation_spans_lock:
@@ -130,6 +150,34 @@ def _get_span(self, operation_id: str | None) -> Span | None:
130150
with self._operation_spans_lock:
131151
return self._operation_spans.get(operation_id)
132152

153+
def get_current_span_context(self) -> SpanContext | None:
154+
"""Return the span context to use for log correlation.
155+
156+
Resolution order:
157+
1. The span attached to the OTel thread-local context. Inside a step or
158+
child context this is the active operation span (attached in
159+
on_user_function_start), and between operations it is the enclosing
160+
operation span (restored in on_user_function_end).
161+
2. The invocation span from the plugin registry. This is the path used
162+
for top-level handler code: the invocation span is never attached to
163+
the worker thread's context, so the registry is the only way to
164+
resolve it.
165+
166+
Returns:
167+
A valid SpanContext, or None if no span is active.
168+
"""
169+
span_context = trace.get_current_span().get_span_context()
170+
if span_context and span_context.is_valid:
171+
return span_context
172+
173+
invocation_span = self._get_span(None)
174+
if invocation_span:
175+
invocation_context = invocation_span.get_span_context()
176+
if invocation_context and invocation_context.is_valid:
177+
return invocation_context
178+
179+
return None
180+
133181
# ------------------------------------------------------------------
134182
# Context resolution
135183
# ------------------------------------------------------------------
@@ -256,7 +304,7 @@ def on_invocation_start(self, info: InvocationStartInfo) -> None:
256304

257305
self._start_span(
258306
operation_id=None,
259-
name=f"invocation",
307+
name="invocation",
260308
attributes=self._extract_attributes(info),
261309
)
262310

@@ -413,7 +461,16 @@ def on_user_function_end(self, info: UserFunctionEndInfo) -> None:
413461
if end_timestamp is not None and end_timestamp == info.start_time:
414462
end_timestamp += datetime.timedelta(microseconds=1)
415463
self._end_span(info.operation_id, end_timestamp)
416-
# We don't call context.detach because the next operation will override it anyway
464+
# Restore the enclosing operation span as current so code that runs
465+
# after this operation (e.g. between steps in a child context)
466+
# correlates to its enclosing operation, not the operation that just
467+
# ended. For a top-level operation (parent_id is None) this is the
468+
# invocation span; for a nested operation it is the parent context span.
469+
parent_span = self._get_span(info.parent_id) or self._get_span(None)
470+
if parent_span:
471+
context.attach(
472+
trace.set_span_in_context(parent_span, self._extracted_context)
473+
)
417474

418475
def _extract_attributes(self, info: Any) -> dict[str, str]:
419476
"""Extract durable execution fields as OpenTelemetry span attributes.

0 commit comments

Comments
 (0)