Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t

## Unreleased

- -fix(lambda-layer): Standardize CompactConsoleLogRecordExporter output with CloudWatch OTLP backend schema.
Comment thread
Miqueasher marked this conversation as resolved.
Outdated
Comment thread
Miqueasher marked this conversation as resolved.
Outdated
([#715](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/715))
- feat: support environment-configured endpoint visibility for HTTP operation names
([#718](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/718))
- fix(lambda-layer): Disable all agentic instrumentation in Lambda by default
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,126 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import json
import logging
import re
from typing import Sequence
import sys
Comment thread
Miqueasher marked this conversation as resolved.
from typing import IO, Sequence

from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs.export import ConsoleLogRecordExporter, LogRecordExportResult
try:
from opentelemetry.sdk._logs.export import LogRecordExportResult as LogExportResult
except ImportError:
from opentelemetry.sdk._logs.export import LogExportResult

# Support both old (LogData/LogExporter) and new (ReadableLogRecord/LogRecordExporter) APIs
try:
from opentelemetry.sdk._logs.export import LogRecordExporter

_BASE_CLASS = LogRecordExporter
except ImportError:
from opentelemetry.sdk._logs.export import LogExporter

_BASE_CLASS = LogExporter

_logger = logging.getLogger(__name__)


def _preserve_attrs(attributes) -> dict:
"""Preserve attribute value types (int, float, bool, str, list)."""
if not attributes:
Comment thread
Miqueasher marked this conversation as resolved.
return {}
return dict(attributes)


def _get_dropped_attrs(data, record) -> int:
"""Extract dropped attributes count from whichever object has it."""
if hasattr(data, "dropped_attributes"):
Comment thread
Miqueasher marked this conversation as resolved.
return data.dropped_attributes or 0
Comment thread
Miqueasher marked this conversation as resolved.
if hasattr(record, "dropped_attributes"):
return record.dropped_attributes or 0
return 0


class CompactConsoleLogRecordExporter(_BASE_CLASS):
"""Exports log records as compact JSON to stdout.

Produces a single-line JSON object per log record aligned with the
CloudWatch OTLP backend's flattened JSON format.

If the standardized serialization fails for any reason, falls back to
the upstream SDK's to_json() format to avoid breaking existing infrastructure.
"""

def __init__(self, out: IO = None):
self._out = out or sys.stdout
self._shutdown = False
Comment thread
Miqueasher marked this conversation as resolved.

def export(self, batch: Sequence) -> LogExportResult:
if self._shutdown:
return LogExportResult.FAILURE

class CompactConsoleLogRecordExporter(ConsoleLogRecordExporter):
def export(self, batch: Sequence[ReadableLogRecord]):
for data in batch:
formatted_json = self.formatter(data.log_record)
print(re.sub(r"\s*([{}[\]:,])\s*", r"\1", formatted_json), flush=True)
try:
line = self._to_compact_json(data)
except Exception: # pylint: disable=broad-exception-caught
_logger.debug(
"Failed to serialize log record, falling back",
exc_info=True,
)
try:
line = self._fallback_format(data)
except Exception: # pylint: disable=broad-exception-caught
_logger.debug("Fallback also failed", exc_info=True)
continue
Comment thread
Miqueasher marked this conversation as resolved.

Comment thread
Miqueasher marked this conversation as resolved.
self._out.write(line + "\n")
self._out.flush()

return LogExportResult.SUCCESS

def shutdown(self):
self._shutdown = True

@staticmethod
def _to_compact_json(data) -> str:
# Support both ReadableLogRecord (1.39+) and LogData (older) APIs.
record = data.log_record
resource = getattr(data, "resource", None) or getattr(record, "resource", None)
scope = getattr(data, "instrumentation_scope", None)

trace_id = getattr(record, "trace_id", None)
Comment thread
Miqueasher marked this conversation as resolved.
span_id = getattr(record, "span_id", None)
is_valid = trace_id is not None and span_id is not None and trace_id != 0 and span_id != 0

return json.dumps(
{
"resource": {
"attributes": _preserve_attrs(resource.attributes if resource else None),
"schemaUrl": getattr(resource, "schema_url", "") or "" if resource else "",
Comment thread
Miqueasher marked this conversation as resolved.
Outdated
},
Comment thread
Miqueasher marked this conversation as resolved.
"scope": {
"name": getattr(scope, "name", "") or "" if scope else "",
"version": getattr(scope, "version", "") or "" if scope else "",
"schemaUrl": getattr(scope, "schema_url", "") or "" if scope else "",
},
"body": record.body if record.body is not None else None,
"severityNumber": (record.severity_number.value if record.severity_number is not None else 0),
Comment thread
Miqueasher marked this conversation as resolved.
"severityText": (record.severity_number.name if record.severity_number is not None else "UNSPECIFIED"),
Comment thread
Miqueasher marked this conversation as resolved.
"attributes": _preserve_attrs(record.attributes),
"droppedAttributes": _get_dropped_attrs(data, record),
"timeUnixNano": record.timestamp or 0,
Comment thread
Miqueasher marked this conversation as resolved.
"observedTimeUnixNano": (record.observed_timestamp or 0),
"traceId": format(trace_id, "032x") if is_valid else "",
"spanId": format(span_id, "016x") if is_valid else "",
"flags": int(record.trace_flags) if record.trace_flags is not None else 0,
"exportPath": "console",
},
separators=(",", ":"),
)

return LogRecordExportResult.SUCCESS
@staticmethod
def _fallback_format(data) -> str:
"""Fall back to upstream SDK's to_json() with whitespace stripped."""
# ReadableLogRecord has to_json() directly; LogData has it on .log_record
obj = data if hasattr(data, "to_json") else data.log_record
formatted_json = obj.to_json()
return re.sub(r"\s*([{}[\]:,])\s*", r"\1", formatted_json)
Comment thread
Miqueasher marked this conversation as resolved.
Loading
Loading