diff --git a/CHANGELOG.md b/CHANGELOG.md index bab866751..b85ec3b2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t ## Unreleased +- fix(lambda-layer): Standardize CompactConsoleLogRecordExporter output with CloudWatch OTLP backend schema. + ([#715](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/715)) - feat(agent-observability): add `AWS_GENAI_CONTENT_EXTRACTION_OPT_OUT` env var to allow disabling LLO content extraction from spans ([#741](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/741)) - fix(mcp-instrumentation): suppress MCP `/ping` spans when agent observability is enabled diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py index 83f58dfa0..d0e7f01f5 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/console/logs/compact_console_log_exporter.py @@ -1,16 +1,128 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 +import json +import logging +import os import re -from typing import Sequence +import sys +from typing import IO, Sequence -from opentelemetry.sdk._logs import ReadableLogRecord -from opentelemetry.sdk._logs.export import ConsoleLogRecordExporter, LogRecordExportResult +try: + from opentelemetry.sdk._logs.export import LogRecordExportResult as LogExportResult +except ImportError: + from opentelemetry.sdk._logs.export import LogExportResult +# Support both old (LogData/LogExporter) and new (ReadableLogRecord/LogRecordExporter) APIs +try: + from opentelemetry.sdk._logs.export import LogRecordExporter + + _BASE_CLASS = LogRecordExporter +except ImportError: + from opentelemetry.sdk._logs.export import LogExporter + + _BASE_CLASS = LogExporter + +_logger = logging.getLogger(__name__) + + +def _preserve_attrs(attributes) -> dict: + """Preserve attribute value types (int, float, bool, str, list).""" + if not attributes: + return {} + return dict(attributes) + + +def _get_dropped_attrs(data, record) -> int: + """Extract dropped attributes count from whichever object has it.""" + if hasattr(data, "dropped_attributes"): + return data.dropped_attributes or 0 + if hasattr(record, "dropped_attributes"): + return record.dropped_attributes or 0 + return 0 + + +class CompactConsoleLogRecordExporter(_BASE_CLASS): + """Exports log records as compact JSON to stdout. + + Produces a single-line JSON object per log record aligned with the + CloudWatch OTLP backend's flattened JSON format. + + If the standardized serialization fails for any reason, falls back to + the upstream SDK's to_json() format to avoid breaking existing infrastructure. + """ + + def __init__(self, out: IO = None): + self._out = out or sys.stdout + self._shutdown = False + + def export(self, batch: Sequence) -> LogExportResult: + if self._shutdown: + return LogExportResult.FAILURE -class CompactConsoleLogRecordExporter(ConsoleLogRecordExporter): - def export(self, batch: Sequence[ReadableLogRecord]): for data in batch: - formatted_json = self.formatter(data.log_record) - print(re.sub(r"\s*([{}[\]:,])\s*", r"\1", formatted_json), flush=True) + try: + line = self._to_compact_json(data) + except Exception: # pylint: disable=broad-exception-caught + _logger.debug( + "Failed to serialize log record, falling back", + exc_info=True, + ) + try: + line = self._fallback_format(data) + except Exception: # pylint: disable=broad-exception-caught + _logger.debug("Fallback also failed", exc_info=True) + continue + + self._out.write(line + "\n") + self._out.flush() + + return LogExportResult.SUCCESS + + def shutdown(self): + self._out.flush() + self._shutdown = True + + @staticmethod + def _to_compact_json(data) -> str: + # Support both ReadableLogRecord (1.39+) and LogData (older) APIs. + record = data.log_record + resource = getattr(data, "resource", None) or getattr(record, "resource", None) + scope = getattr(data, "instrumentation_scope", None) + + trace_id = getattr(record, "trace_id", None) + span_id = getattr(record, "span_id", None) + is_valid = trace_id is not None and span_id is not None and trace_id != 0 and span_id != 0 + + return json.dumps( + { + "resource": { + "attributes": _preserve_attrs(resource.attributes if resource else None), + "schemaUrl": (getattr(resource, "schema_url", "") or "") if resource else "", + }, + "scope": { + "name": (getattr(scope, "name", "") or "") if scope else "", + "version": (getattr(scope, "version", "") or "") if scope else "", + "schemaUrl": (getattr(scope, "schema_url", "") or "") if scope else "", + }, + "body": record.body if record.body is not None else None, + "severityNumber": (record.severity_number.value if record.severity_number is not None else 0), + "severityText": (record.severity_number.name if record.severity_number is not None else "UNSPECIFIED"), + "attributes": _preserve_attrs(record.attributes), + "droppedAttributes": _get_dropped_attrs(data, record), + "timeUnixNano": record.timestamp or 0, + "observedTimeUnixNano": (record.observed_timestamp or 0), + "traceId": format(trace_id, "032x") if is_valid else "", + "spanId": format(span_id, "016x") if is_valid else "", + "flags": int(record.trace_flags) if record.trace_flags is not None else 0, + **({"exportPath": "console"} if os.environ.get("ADOT_TEST_EXPORT_PATH_ENABLED") == "true" else {}), + }, + separators=(",", ":"), + ) - return LogRecordExportResult.SUCCESS + @staticmethod + def _fallback_format(data) -> str: + """Fall back to upstream SDK's to_json() with whitespace stripped.""" + # ReadableLogRecord has to_json() directly; LogData has it on .log_record + obj = data if hasattr(data, "to_json") else data.log_record + formatted_json = obj.to_json() + return re.sub(r"\s*([{}[\]:,])\s*", r"\1", formatted_json) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py index 7d4187613..d98efd548 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/console/logs/test_compact_console_log_exporter.py @@ -1,72 +1,336 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 +import inspect +import io +import json import unittest -from unittest.mock import Mock, patch +import unittest.mock from amazon.opentelemetry.distro.exporter.console.logs.compact_console_log_exporter import ( CompactConsoleLogRecordExporter, ) -from opentelemetry.sdk._logs.export import LogRecordExportResult +from opentelemetry._logs import SeverityNumber +try: + from opentelemetry.sdk._logs.export import LogRecordExportResult as LogExportResult +except ImportError: + from opentelemetry.sdk._logs.export import LogExportResult + +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.trace import TraceFlags + +# SDK 1.40+ moved LogRecord to _internal and removed 'resource' from its constructor. +# ReadableLogRecord wraps LogRecord with resource + scope on 1.40+. +# Older SDKs use LogData for the same purpose and LogRecord accepts 'resource'. +try: + from opentelemetry.sdk._logs import LogRecord +except ImportError: + from opentelemetry.sdk._logs._internal import LogRecord + +try: + from opentelemetry.sdk._logs import ReadableLogRecord +except ImportError: + ReadableLogRecord = None + +try: + from opentelemetry.sdk._logs import LogData +except ImportError: + LogData = None + +_LOG_RECORD_ACCEPTS_RESOURCE = "resource" in inspect.signature(LogRecord.__init__).parameters + + +def _make_log_data( + body="Test log message", + severity_number=SeverityNumber.INFO, + trace_id=int("12345678901234567890123456789012", 16), + span_id=int("1234567890123456", 16), + trace_flags=TraceFlags(TraceFlags.SAMPLED), + timestamp=1000000000 * 1_000_000_000, + observed_timestamp=1000000000 * 1_000_000_000, + attributes=None, + resource=None, + scope=None, +): + if attributes is None: + attributes = {"key": "value"} + if resource is None: + resource = Resource( + attributes={"service.name": "test-service"}, + schema_url="https://opentelemetry.io/schemas/1.0.0", + ) + if scope is None: + scope = InstrumentationScope( + name="test-scope", + version="1.0.0", + schema_url="https://opentelemetry.io/schemas/1.0.0", + ) + + log_record_kwargs = dict( + timestamp=timestamp, + observed_timestamp=observed_timestamp, + trace_id=trace_id, + span_id=span_id, + trace_flags=trace_flags, + severity_number=severity_number, + body=body, + attributes=attributes if attributes else None, + ) + if _LOG_RECORD_ACCEPTS_RESOURCE: + log_record_kwargs["resource"] = resource + + log_record = LogRecord(**log_record_kwargs) + + if ReadableLogRecord is not None: + return ReadableLogRecord( + log_record=log_record, + resource=resource, + instrumentation_scope=scope, + ) + return LogData(log_record=log_record, instrumentation_scope=scope) -class TestCompactConsoleLogRecordExporter(unittest.TestCase): +class TestCompactConsoleLogRecordExporter(unittest.TestCase): def setUp(self): - self.exporter = CompactConsoleLogRecordExporter() + self.buf = io.StringIO() + self.exporter = CompactConsoleLogRecordExporter(out=self.buf) + + def _get_output(self): + return self.buf.getvalue().strip() - @patch("builtins.print") - def test_export_compresses_json(self, mock_print): - # Mock log data - mock_log_data = Mock() - mock_log_record = Mock() - mock_log_data.log_record = mock_log_record + def _get_parsed(self): + return json.loads(self._get_output()) - # Mock formatted JSON with whitespace - formatted_json = '{\n "body": "test message",\n "severity_number": 9,\n "attributes": {\n "key": "value"\n }\n}' # noqa: E501 - self.exporter.formatter = Mock(return_value=formatted_json) + def test_export_with_all_fields_set(self): + data = _make_log_data() + result = self.exporter.export([data]) - # Call export - result = self.exporter.export([mock_log_data]) + self.assertEqual(result, LogExportResult.SUCCESS) + parsed = self._get_parsed() - # Verify result - self.assertEqual(result, LogRecordExportResult.SUCCESS) + # Validate all top-level fields are present + expected_keys = [ + "resource", + "scope", + "body", + "severityNumber", + "severityText", + "attributes", + "droppedAttributes", + "timeUnixNano", + "observedTimeUnixNano", + "traceId", + "spanId", + "flags", + ] + for key in expected_keys: + self.assertIn(key, parsed, f"Missing key: {key}") - # Verify print calls - self.assertEqual(mock_print.call_count, 1) - mock_print.assert_called_with( - '{"body":"test message","severity_number":9,"attributes":{"key":"value"}}', flush=True + # Validate values + self.assertEqual(parsed["body"], "Test log message") + self.assertEqual(parsed["severityNumber"], 9) + self.assertEqual(parsed["severityText"], "INFO") + self.assertEqual(parsed["attributes"], {"key": "value"}) + self.assertEqual(parsed["droppedAttributes"], 0) + self.assertEqual(parsed["timeUnixNano"], 1000000000 * 1_000_000_000) + self.assertEqual(parsed["observedTimeUnixNano"], 1000000000 * 1_000_000_000) + self.assertEqual(parsed["traceId"], "12345678901234567890123456789012") + self.assertEqual(parsed["spanId"], "1234567890123456") + self.assertEqual(parsed["flags"], 1) + + # Validate nested objects + self.assertIn("attributes", parsed["resource"]) + self.assertEqual(parsed["resource"]["attributes"]["service.name"], "test-service") + self.assertEqual( + parsed["resource"]["schemaUrl"], + "https://opentelemetry.io/schemas/1.0.0", ) - @patch("builtins.print") - def test_export_multiple_records(self, mock_print): - # Mock multiple log data - mock_log_data1 = Mock() - mock_log_data2 = Mock() - mock_log_data1.log_record = Mock() - mock_log_data2.log_record = Mock() + scope = parsed["scope"] + self.assertEqual(scope["name"], "test-scope") + self.assertEqual(scope["version"], "1.0.0") + self.assertEqual(scope["schemaUrl"], "https://opentelemetry.io/schemas/1.0.0") + + def test_null_body(self): + data = _make_log_data(body=None) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertIsNone(parsed["body"]) + + def test_zero_timestamps(self): + data = _make_log_data(timestamp=0, observed_timestamp=0) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["timeUnixNano"], 0) + self.assertEqual(parsed["observedTimeUnixNano"], 0) + + def test_invalid_span_context_all_zeros(self): + data = _make_log_data(trace_id=0, span_id=0) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["traceId"], "") + self.assertEqual(parsed["spanId"], "") - formatted_json = '{\n "body": "test"\n}' - self.exporter.formatter = Mock(return_value=formatted_json) + def test_invalid_trace_id_only(self): + data = _make_log_data(trace_id=0, span_id=int("1234567890123456", 16)) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["traceId"], "") + self.assertEqual(parsed["spanId"], "") - # Call export - result = self.exporter.export([mock_log_data1, mock_log_data2]) + def test_invalid_span_id_only(self): + data = _make_log_data(trace_id=int("12345678901234567890123456789012", 16), span_id=0) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["traceId"], "") + self.assertEqual(parsed["spanId"], "") - # Verify result - self.assertEqual(result, LogRecordExportResult.SUCCESS) + def test_no_span_context(self): + data = _make_log_data(trace_id=None, span_id=None, trace_flags=None) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["traceId"], "") + self.assertEqual(parsed["spanId"], "") + self.assertEqual(parsed["flags"], 0) - # Verify print calls - self.assertEqual(mock_print.call_count, 2) # 2 records - # Each record should print compact JSON - expected_calls = [unittest.mock.call('{"body":"test"}', flush=True)] * 2 - mock_print.assert_has_calls(expected_calls) + def test_empty_attributes(self): + data = _make_log_data(attributes={}) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["attributes"], {}) - @patch("builtins.print") - def test_export_empty_batch(self, mock_print): - # Call export with empty batch + def test_attribute_values_preserve_types(self): + data = _make_log_data(attributes={"count": 42, "enabled": True, "rate": 3.14, "name": "test"}) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["attributes"]["count"], 42) + self.assertEqual(parsed["attributes"]["enabled"], True) + self.assertEqual(parsed["attributes"]["rate"], 3.14) + self.assertEqual(parsed["attributes"]["name"], "test") + + def test_multiple_log_records(self): + data1 = _make_log_data(body="first") + data2 = _make_log_data(body="second") + self.exporter.export([data1, data2]) + lines = self.buf.getvalue().strip().split("\n") + self.assertEqual(len(lines), 2) + self.assertEqual(json.loads(lines[0])["body"], "first") + self.assertEqual(json.loads(lines[1])["body"], "second") + + def test_export_empty_batch(self): result = self.exporter.export([]) + self.assertEqual(result, LogExportResult.SUCCESS) + self.assertEqual(self.buf.getvalue(), "") + + def test_shutdown_prevents_export(self): + self.exporter.shutdown() + data = _make_log_data() + result = self.exporter.export([data]) + self.assertEqual(result, LogExportResult.FAILURE) + self.assertEqual(self.buf.getvalue(), "") + + def test_empty_resource_and_scope(self): + data = _make_log_data( + resource=Resource(attributes={}), + scope=InstrumentationScope(name=""), + ) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["resource"]["attributes"], {}) + self.assertEqual(parsed["resource"]["schemaUrl"], "") + self.assertEqual(parsed["scope"]["name"], "") + self.assertEqual(parsed["scope"]["version"], "") + self.assertEqual(parsed["scope"]["schemaUrl"], "") + + def test_timestamp_raw_nanos(self): + nanos = 1000000000 * 1_000_000_000 + 123 * 1_000_000 + data = _make_log_data(timestamp=nanos) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["timeUnixNano"], nanos) + + def test_timestamp_preserves_full_precision(self): + nanos = 1000000000 * 1_000_000_000 + 100 * 1_000_000 + data = _make_log_data(timestamp=nanos) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["timeUnixNano"], nanos) + + def test_severity_text_uses_enum_name(self): + for sev, expected_name in [ + (SeverityNumber.TRACE, "TRACE"), + (SeverityNumber.DEBUG, "DEBUG"), + (SeverityNumber.INFO, "INFO"), + (SeverityNumber.WARN, "WARN"), + (SeverityNumber.ERROR, "ERROR"), + (SeverityNumber.FATAL, "FATAL"), + ]: + self.buf = io.StringIO() + self.exporter = CompactConsoleLogRecordExporter(out=self.buf) + data = _make_log_data(severity_number=sev) + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertEqual(parsed["severityText"], expected_name) + self.assertEqual(parsed["severityNumber"], sev.value) + + def test_output_is_compact_single_line(self): + data = _make_log_data() + self.exporter.export([data]) + output = self._get_output() + self.assertNotIn("\n", output) + self.assertNotIn(" ", output) + + def test_fallback_on_serialization_error(self): + """If _to_compact_json fails, fallback should produce output.""" + data = _make_log_data() + with unittest.mock.patch.object( + CompactConsoleLogRecordExporter, + "_to_compact_json", + side_effect=ValueError("forced"), + ): + result = self.exporter.export([data]) + self.assertEqual(result, LogExportResult.SUCCESS) + output = self._get_output() + self.assertTrue(len(output) > 0) + + def test_fallback_also_fails_continues(self): + """If both _to_compact_json and _fallback_format fail, skip record.""" + data = _make_log_data() + with unittest.mock.patch.object( + CompactConsoleLogRecordExporter, + "_to_compact_json", + side_effect=ValueError("forced"), + ), unittest.mock.patch.object( + CompactConsoleLogRecordExporter, + "_fallback_format", + side_effect=ValueError("fallback forced"), + ): + result = self.exporter.export([data]) + self.assertEqual(result, LogExportResult.SUCCESS) + self.assertEqual(self.buf.getvalue(), "") + + def test_dropped_attributes_from_record(self): + """Test dropped_attributes extracted from record when data lacks it.""" + data = _make_log_data() + # Remove dropped_attributes from data if present, add to record + if hasattr(data, "log_record"): + data.log_record.dropped_attributes = 5 + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertIsInstance(parsed["droppedAttributes"], int) - # Verify result - self.assertEqual(result, LogRecordExportResult.SUCCESS) + def test_dropped_attributes_defaults_to_zero(self): + """Test droppedAttributes is always an integer.""" + data = _make_log_data() + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertIsInstance(parsed["droppedAttributes"], int) + self.assertEqual(parsed["droppedAttributes"], 0) - # Verify print calls - mock_print.assert_not_called() # No records, no prints + def test_export_path_field_absent_when_env_unset(self): + """Console exporter omits exportPath when env var is not set.""" + data = _make_log_data() + self.exporter.export([data]) + parsed = self._get_parsed() + self.assertNotIn("exportPath", parsed) diff --git a/lambda-layer/sample-apps/function/lambda_function.py b/lambda-layer/sample-apps/function/lambda_function.py index c631abb86..ff61a3641 100644 --- a/lambda-layer/sample-apps/function/lambda_function.py +++ b/lambda-layer/sample-apps/function/lambda_function.py @@ -1,14 +1,39 @@ import json +import logging import os import boto3 import requests +from opentelemetry._logs import LogRecord, SeverityNumber, get_logger + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) +otel_logger = get_logger(__name__) + client = boto3.client("s3") # lambda function def lambda_handler(event, context): + logger.debug("debug-level-test-message") + logger.info("info-level-test-message") + logger.warning("warn-level-test-message") + logger.error("error-level-test-message") + + # Test all attribute data types via OTel Logs API directly + otel_logger.emit( + LogRecord( + body="type-test-message", + severity_number=SeverityNumber.INFO, + attributes={ + "bool_attr": True, + "float_attr": 3.14, + "int_attr": 42, + "string_attr": "hello", + }, + ) + ) requests.get("https://aws.amazon.com/")