Skip to content

Commit 31ede4a

Browse files
authored
Standardize Python Log Exporter JSON output to match canonical schema (#715)
The ADOT Lambda layers support `OTEL_LOGS_EXPORTER=console`, which writes log records as compact JSON to stdout. Lambda's FluxPump reads stdout and forwards to PLE. Previously, each language (Java, Python, JS, .NET) produced different JSON output because Python and JS delegated to their upstream OTel SDK's serialization methods, which each made independent formatting choices (snake_case vs camelCase field names, 0x-prefixed trace IDs, different timestamp formats, missing fields). This PR aligns the Python exporter output with the canonical schema defined by the Java implementation, ensuring consistent JSON structure across all ADOT language SDKs - Rewrites the `CompactConsoleLogRecordExporter` to build JSON directly from `LogRecord` fields instead of delegating to the upstream SDK's `to_json()` serialization, which produced a different schema than the Java reference - Adds try-catch fallback to the original upstream SDK format if the new serialization fails, to avoid breaking existing customer infrastructure By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent aefb06e commit 31ede4a

4 files changed

Lines changed: 457 additions & 54 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t
1212

1313
## Unreleased
1414

15+
- fix(lambda-layer): Standardize CompactConsoleLogRecordExporter output with CloudWatch OTLP backend schema.
16+
([#715](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/715))
1517
- feat(agent-observability): add `AWS_GENAI_CONTENT_EXTRACTION_OPT_OUT` env var to allow disabling LLO content extraction from spans
1618
([#741](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/741))
1719
- fix(mcp-instrumentation): suppress MCP `/ping` spans when agent observability is enabled
Lines changed: 120 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,128 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3+
import json
4+
import logging
5+
import os
36
import re
4-
from typing import Sequence
7+
import sys
8+
from typing import IO, Sequence
59

6-
from opentelemetry.sdk._logs import ReadableLogRecord
7-
from opentelemetry.sdk._logs.export import ConsoleLogRecordExporter, LogRecordExportResult
10+
try:
11+
from opentelemetry.sdk._logs.export import LogRecordExportResult as LogExportResult
12+
except ImportError:
13+
from opentelemetry.sdk._logs.export import LogExportResult
814

15+
# Support both old (LogData/LogExporter) and new (ReadableLogRecord/LogRecordExporter) APIs
16+
try:
17+
from opentelemetry.sdk._logs.export import LogRecordExporter
18+
19+
_BASE_CLASS = LogRecordExporter
20+
except ImportError:
21+
from opentelemetry.sdk._logs.export import LogExporter
22+
23+
_BASE_CLASS = LogExporter
24+
25+
_logger = logging.getLogger(__name__)
26+
27+
28+
def _preserve_attrs(attributes) -> dict:
29+
"""Preserve attribute value types (int, float, bool, str, list)."""
30+
if not attributes:
31+
return {}
32+
return dict(attributes)
33+
34+
35+
def _get_dropped_attrs(data, record) -> int:
36+
"""Extract dropped attributes count from whichever object has it."""
37+
if hasattr(data, "dropped_attributes"):
38+
return data.dropped_attributes or 0
39+
if hasattr(record, "dropped_attributes"):
40+
return record.dropped_attributes or 0
41+
return 0
42+
43+
44+
class CompactConsoleLogRecordExporter(_BASE_CLASS):
45+
"""Exports log records as compact JSON to stdout.
46+
47+
Produces a single-line JSON object per log record aligned with the
48+
CloudWatch OTLP backend's flattened JSON format.
49+
50+
If the standardized serialization fails for any reason, falls back to
51+
the upstream SDK's to_json() format to avoid breaking existing infrastructure.
52+
"""
53+
54+
def __init__(self, out: IO = None):
55+
self._out = out or sys.stdout
56+
self._shutdown = False
57+
58+
def export(self, batch: Sequence) -> LogExportResult:
59+
if self._shutdown:
60+
return LogExportResult.FAILURE
961

10-
class CompactConsoleLogRecordExporter(ConsoleLogRecordExporter):
11-
def export(self, batch: Sequence[ReadableLogRecord]):
1262
for data in batch:
13-
formatted_json = self.formatter(data.log_record)
14-
print(re.sub(r"\s*([{}[\]:,])\s*", r"\1", formatted_json), flush=True)
63+
try:
64+
line = self._to_compact_json(data)
65+
except Exception: # pylint: disable=broad-exception-caught
66+
_logger.debug(
67+
"Failed to serialize log record, falling back",
68+
exc_info=True,
69+
)
70+
try:
71+
line = self._fallback_format(data)
72+
except Exception: # pylint: disable=broad-exception-caught
73+
_logger.debug("Fallback also failed", exc_info=True)
74+
continue
75+
76+
self._out.write(line + "\n")
77+
self._out.flush()
78+
79+
return LogExportResult.SUCCESS
80+
81+
def shutdown(self):
82+
self._out.flush()
83+
self._shutdown = True
84+
85+
@staticmethod
86+
def _to_compact_json(data) -> str:
87+
# Support both ReadableLogRecord (1.39+) and LogData (older) APIs.
88+
record = data.log_record
89+
resource = getattr(data, "resource", None) or getattr(record, "resource", None)
90+
scope = getattr(data, "instrumentation_scope", None)
91+
92+
trace_id = getattr(record, "trace_id", None)
93+
span_id = getattr(record, "span_id", None)
94+
is_valid = trace_id is not None and span_id is not None and trace_id != 0 and span_id != 0
95+
96+
return json.dumps(
97+
{
98+
"resource": {
99+
"attributes": _preserve_attrs(resource.attributes if resource else None),
100+
"schemaUrl": (getattr(resource, "schema_url", "") or "") if resource else "",
101+
},
102+
"scope": {
103+
"name": (getattr(scope, "name", "") or "") if scope else "",
104+
"version": (getattr(scope, "version", "") or "") if scope else "",
105+
"schemaUrl": (getattr(scope, "schema_url", "") or "") if scope else "",
106+
},
107+
"body": record.body if record.body is not None else None,
108+
"severityNumber": (record.severity_number.value if record.severity_number is not None else 0),
109+
"severityText": (record.severity_number.name if record.severity_number is not None else "UNSPECIFIED"),
110+
"attributes": _preserve_attrs(record.attributes),
111+
"droppedAttributes": _get_dropped_attrs(data, record),
112+
"timeUnixNano": record.timestamp or 0,
113+
"observedTimeUnixNano": (record.observed_timestamp or 0),
114+
"traceId": format(trace_id, "032x") if is_valid else "",
115+
"spanId": format(span_id, "016x") if is_valid else "",
116+
"flags": int(record.trace_flags) if record.trace_flags is not None else 0,
117+
**({"exportPath": "console"} if os.environ.get("ADOT_TEST_EXPORT_PATH_ENABLED") == "true" else {}),
118+
},
119+
separators=(",", ":"),
120+
)
15121

16-
return LogRecordExportResult.SUCCESS
122+
@staticmethod
123+
def _fallback_format(data) -> str:
124+
"""Fall back to upstream SDK's to_json() with whitespace stripped."""
125+
# ReadableLogRecord has to_json() directly; LogData has it on .log_record
126+
obj = data if hasattr(data, "to_json") else data.log_record
127+
formatted_json = obj.to_json()
128+
return re.sub(r"\s*([{}[\]:,])\s*", r"\1", formatted_json)

0 commit comments

Comments
 (0)