Skip to content

Commit c2682b8

Browse files
Alex Wangwangyb-A
authored andcommitted
feat: Add otel logger
1 parent 74cebf1 commit c2682b8

10 files changed

Lines changed: 369 additions & 473 deletions

File tree

packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
from aws_durable_execution_sdk_python_otel.deterministic_id_generator import (
1010
DeterministicIdGenerator,
1111
)
12-
from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger
12+
from aws_durable_execution_sdk_python_otel.log_filter import (
13+
OtelContextLogFilter,
14+
install_log_filter,
15+
)
1316
from aws_durable_execution_sdk_python_otel.plugin import (
1417
DurableExecutionOtelPlugin,
1518
)
@@ -20,7 +23,8 @@
2023
"ContextExtractor",
2124
"DeterministicIdGenerator",
2225
"DurableExecutionOtelPlugin",
23-
"OtelEnrichedLogger",
26+
"OtelContextLogFilter",
27+
"install_log_filter",
2428
"w3c_client_context_extractor",
2529
"xray_context_extractor",
2630
]
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Root-logger filter that stamps OTel trace context onto every log record.
2+
3+
The filter attaches to a stdlib logging handler and enriches *every* record
4+
that flows through it: direct ``logging.getLogger().info(...)`` calls,
5+
child-logger records that propagate to root, and third-party library logs.
6+
7+
The span/trace identifiers are added as ``LogRecord`` attributes (using
8+
underscore names, since dotted names are not valid record attributes):
9+
10+
- ``otel_trace_id``: 32-char hex trace identifier
11+
- ``otel_span_id``: 16-char hex span identifier
12+
- ``otel_trace_sampled``: boolean indicating if the trace is sampled
13+
14+
These attributes are only set when a valid span context is active. Records
15+
emitted outside an active invocation (e.g. during Lambda teardown) pass through
16+
unmodified, so any log formatter or schema must treat the fields as optional.
17+
"""
18+
19+
from __future__ import annotations
20+
21+
import logging
22+
from typing import TYPE_CHECKING
23+
24+
from opentelemetry.trace import TraceFlags
25+
26+
27+
if TYPE_CHECKING:
28+
from aws_durable_execution_sdk_python_otel.plugin import DurableExecutionOtelPlugin
29+
30+
31+
class OtelContextLogFilter(logging.Filter):
32+
"""Logging filter that injects the active OTel span context onto records.
33+
34+
The filter is a pure reader of the plugin's current span context. It
35+
resolves the span at emit time, on the thread that emits the record, via
36+
``plugin.get_current_span_context()``. That method returns the active
37+
operation span inside steps and child contexts (attached to the worker
38+
thread's OTel context) and falls back to the invocation span for top-level
39+
handler code.
40+
41+
The filter never caches identifiers and always returns ``True`` so it never
42+
drops a record.
43+
44+
Args:
45+
plugin: The OTel plugin instance that resolves the current span context.
46+
"""
47+
48+
def __init__(self, plugin: DurableExecutionOtelPlugin) -> None:
49+
super().__init__()
50+
self._plugin = plugin
51+
52+
def filter(self, record: logging.LogRecord) -> bool:
53+
"""Stamp the active span context onto the record, then allow it through."""
54+
span_context = self._plugin.get_current_span_context()
55+
if span_context and span_context.is_valid:
56+
record.otel_trace_id = format(span_context.trace_id, "032x")
57+
record.otel_span_id = format(span_context.span_id, "016x")
58+
record.otel_trace_sampled = bool(
59+
span_context.trace_flags & TraceFlags.SAMPLED
60+
)
61+
return True
62+
63+
64+
def install_log_filter(
65+
plugin: DurableExecutionOtelPlugin,
66+
target_logger: logging.Logger | None = None,
67+
) -> OtelContextLogFilter | None:
68+
"""Attach an OtelContextLogFilter to a logger's handlers, idempotently.
69+
70+
The filter is attached to each handler on ``target_logger`` (the root logger
71+
by default). Attaching to handlers rather than the logger itself ensures
72+
records propagated from child loggers are also enriched, since handler
73+
filters run for every record reaching the handler.
74+
75+
This is safe to call on every invocation: if a handler already has an
76+
OtelContextLogFilter, it is left as-is, so warm Lambda reuse will not stack
77+
duplicate filters. A single shared filter instance is reused across all
78+
handlers.
79+
80+
Args:
81+
plugin: The OTel plugin that resolves the current span context.
82+
target_logger: Logger whose handlers receive the filter. Defaults to the
83+
root logger, which in AWS Lambda is where runtime log handlers live.
84+
85+
Returns:
86+
The filter instance attached to the handlers, or ``None`` if the target
87+
logger has no handlers to attach to.
88+
"""
89+
logger = target_logger if target_logger is not None else logging.getLogger()
90+
91+
context_filter: OtelContextLogFilter | None = None
92+
for handler in logger.handlers:
93+
existing = next(
94+
(f for f in handler.filters if isinstance(f, OtelContextLogFilter)),
95+
None,
96+
)
97+
if existing is not None:
98+
# Reuse the already-installed filter so a single instance is shared.
99+
context_filter = existing
100+
continue
101+
if context_filter is None:
102+
context_filter = OtelContextLogFilter(plugin)
103+
handler.addFilter(context_filter)
104+
105+
return context_filter

packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/logger.py

Lines changed: 0 additions & 86 deletions
This file was deleted.

packages/aws-durable-execution-sdk-python-otel/src/aws_durable_execution_sdk_python_otel/plugin.py

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import datetime
66
import logging
77
import threading
8-
from typing import TYPE_CHECKING, Any
8+
from typing import Any
99

1010
from aws_durable_execution_sdk_python.lambda_service import OperationType
1111
from aws_durable_execution_sdk_python.plugin import (
@@ -39,11 +39,7 @@
3939
DeterministicIdGenerator,
4040
operation_id_to_span_id,
4141
)
42-
from aws_durable_execution_sdk_python_otel.logger import OtelEnrichedLogger
43-
44-
45-
if TYPE_CHECKING:
46-
from aws_durable_execution_sdk_python.types import LoggerInterface
42+
from aws_durable_execution_sdk_python_otel.log_filter import install_log_filter
4743

4844

4945
logger = logging.getLogger(__name__)
@@ -93,6 +89,10 @@ def __init__(
9389
The provided tracer provider is configured with this plugin's
9490
deterministic ID generator and sampling strategy so spans for a durable
9591
execution share stable trace and logical operation identifiers.
92+
93+
When enrich_logger is enabled (default), the plugin installs a logging
94+
filter on the root logger at invocation start that stamps the active
95+
OTel trace context onto every emitted log record.
9696
"""
9797
self._enrich_logger = enrich_logger
9898
self._context_extractor: ContextExtractor = (
@@ -117,24 +117,6 @@ def __init__(
117117
self._operation_spans: dict[str | None, Span] = {}
118118
self._operation_spans_lock = threading.RLock()
119119

120-
def wrap_logger(self, logger: LoggerInterface) -> LoggerInterface | None:
121-
"""Wrap the execution logger to inject OTel trace context.
122-
123-
When enrich_logger is enabled (default), returns an OtelEnrichedLogger
124-
that adds trace_id, span_id, and trace_sampled to every log message.
125-
Idempotent: returns None if the logger is already an OtelEnrichedLogger.
126-
127-
Args:
128-
logger: The current logger interface from the execution context.
129-
130-
Returns:
131-
An OtelEnrichedLogger wrapping the input, or None if disabled or
132-
already wrapped.
133-
"""
134-
if not self._enrich_logger or isinstance(logger, OtelEnrichedLogger):
135-
return None
136-
return OtelEnrichedLogger(inner=logger, plugin=self)
137-
138120
def _set_span(self, operation_id: str | None, span: Span) -> None:
139121
"""Register the active span for an operation ID."""
140122
with self._operation_spans_lock:
@@ -225,8 +207,8 @@ def _start_span(
225207
Returns:
226208
The started OpenTelemetry span.
227209
"""
228-
logger.info(
229-
"starting a span: operation_id=%s, name=%s, parent_span=%s",
210+
logger.debug(
211+
"Starting OTel span: operation_id=%s, name=%s, parent_span=%s",
230212
operation_id,
231213
name,
232214
parent_span,
@@ -269,7 +251,7 @@ def _start_span(
269251
)
270252
self._operation_spans[operation_id] = span
271253

272-
logger.info("started a span: %s", span)
254+
logger.debug("Started OTel span: %s", span)
273255
return span
274256

275257
def _end_span(
@@ -283,25 +265,31 @@ def _end_span(
283265
end_timestamp: Optional durable end timestamp to use as the span end
284266
time. When omitted, OpenTelemetry uses the current time.
285267
"""
286-
logger.info("ending a span for operation: %s", operation_id)
268+
logger.debug("Ending OTel span: operation_id=%s", operation_id)
287269
with self._operation_spans_lock:
288270
span = self._operation_spans.pop(operation_id, None)
289271
if span:
290272
# the span is not going to be populated if it has the same end_time and start_time
291273
end_time = _to_otel_timestamp(end_timestamp) if end_timestamp else None
292274
span.end(end_time=end_time)
293-
logger.info("ended otel span: %s", span)
275+
logger.debug("Ended OTel span: %s", span)
294276

295277
# ------------------------------------------------------------------
296278
# Plugin lifecycle callbacks
297279
# ------------------------------------------------------------------
298280
def on_invocation_start(self, info: InvocationStartInfo) -> None:
299281
"""Called at the start of each invocation. Creates the invocation span."""
300-
logger.info("Invocation started: %s", info)
282+
logger.debug("Durable invocation started: %s", info)
301283
self._execution_arn = info.execution_arn or ""
302284
self._extracted_context = self._context_extractor(info)
303285
self._id_generator.set_trace_id(self._execution_arn, info.start_time)
304286

287+
if self._enrich_logger:
288+
# Install (idempotently) the root-logger filter so every log record
289+
# emitted during this invocation is stamped with the active span
290+
# context. Safe to call on every invocation, including warm reuse.
291+
install_log_filter(self)
292+
305293
self._start_span(
306294
operation_id=None,
307295
name="invocation",
@@ -310,7 +298,7 @@ def on_invocation_start(self, info: InvocationStartInfo) -> None:
310298

311299
def on_invocation_end(self, info: InvocationEndInfo) -> None:
312300
"""Called at the end of each invocation. Ends the invocation span and flushes."""
313-
logger.info(f"Invocation ended: {info}")
301+
logger.debug("Durable invocation ended: %s", info)
314302
end_time = info.end_time
315303
# end all pending spans
316304
with self._operation_spans_lock:
@@ -334,7 +322,7 @@ def on_invocation_end(self, info: InvocationEndInfo) -> None:
334322

335323
def on_operation_start(self, info: OperationStartInfo) -> None:
336324
"""Called when an operation begins. Creates a span for the operation."""
337-
logger.info(f"Operation started: {info}")
325+
logger.debug("Durable operation started: %s", info)
338326
if info.operation_type in [OperationType.CONTEXT, OperationType.STEP]:
339327
# Context and Step operations are tracked using on_user_function_start
340328
return
@@ -358,7 +346,7 @@ def on_operation_end(self, info: OperationEndInfo) -> None:
358346
continuation span is created and linked to the deterministic logical
359347
operation span before being ended.
360348
"""
361-
logger.info(f"Operation ended: {info}")
349+
logger.debug("Durable operation ended: %s", info)
362350
if info.operation_type in [OperationType.CONTEXT, OperationType.STEP]:
363351
# Context and Step operations are tracked using on_user_function_end
364352
return
@@ -401,7 +389,7 @@ def on_user_function_start(self, info: UserFunctionStartInfo) -> None:
401389
Args:
402390
info: Information about the operation attempt.
403391
"""
404-
logger.info("User function started: %s", info)
392+
logger.debug("Durable user function started: %s", info)
405393
# Context and Step operations are tracked using on_user_function_start
406394
if info.operation_type not in [OperationType.CONTEXT, OperationType.STEP]:
407395
raise RuntimeError(
@@ -429,7 +417,7 @@ def on_user_function_end(self, info: UserFunctionEndInfo) -> None:
429417
Args:
430418
info: Information about the operation attempt.
431419
"""
432-
logger.info("User function ended: %s", info)
420+
logger.debug("Durable user function ended: %s", info)
433421
if info.operation_type not in [OperationType.CONTEXT, OperationType.STEP]:
434422
raise RuntimeError(
435423
"on_user_function_end should only be called for CONTEXT and STEP operations"

0 commit comments

Comments
 (0)