diff --git a/CHANGELOG.md b/CHANGELOG.md index d9e48cd6df..7fb77e2587 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added +- Add `BaggageLogProcessor` to `opentelemetry-processor-baggage` + ([#4371](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4371)) - `opentelemetry-instrumentation-asgi`: Respect `suppress_http_instrumentation` context in ASGI middleware to skip server span creation when HTTP instrumentation is suppressed ([#4375](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4375)) diff --git a/processor/opentelemetry-processor-baggage/README.rst b/processor/opentelemetry-processor-baggage/README.rst index 9c49a6ff75..f8dfe4e9f5 100644 --- a/processor/opentelemetry-processor-baggage/README.rst +++ b/processor/opentelemetry-processor-baggage/README.rst @@ -65,6 +65,51 @@ For example, to only copy baggage entries that match the regex `^key.+`: regex_predicate = lambda baggage_key: baggage_key.startswith("^key.+") tracer_provider.add_span_processor(BaggageSpanProcessor(regex_predicate)) +BaggageLogProcessor +------------------- + +The BaggageLogProcessor reads entries stored in Baggage +from the current context and adds the baggage entries' keys and +values to the log record as attributes on emit. + +Add this log processor to a logger provider. + +To configure the log processor to copy all baggage entries: + +:: + + from opentelemetry.processor.baggage import BaggageLogProcessor, ALLOW_ALL_BAGGAGE_KEYS + + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor(BaggageLogProcessor(ALLOW_ALL_BAGGAGE_KEYS)) + + +Alternatively, you can provide a custom baggage key predicate to select which baggage keys you want to copy. + +For example, to only copy baggage entries that start with `my-key`: + +:: + + starts_with_predicate = lambda baggage_key: baggage_key.startswith("my-key") + logger_provider.add_log_record_processor(BaggageLogProcessor(starts_with_predicate)) + + +For example, to only copy baggage entries that match the regex `^key.+`: + +:: + + regex_predicate = lambda baggage_key: re.match(r"^key.+", baggage_key) is not None + logger_provider.add_log_record_processor(BaggageLogProcessor(regex_predicate)) + +For example, to copy baggage entries matching multiple predicates: + +:: + + multiple_predicates = [ + lambda baggage_key: baggage_key.startswith("my-key"), + lambda baggage_key: baggage_key.startswith("other-key"), + ] + logger_provider.add_log_record_processor(BaggageLogProcessor(multiple_predicates)) References ---------- diff --git a/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/__init__.py b/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/__init__.py index fcff749d64..99b764bad8 100644 --- a/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/__init__.py +++ b/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/__init__.py @@ -13,8 +13,7 @@ # limitations under the License. # pylint: disable=import-error - from .processor import ALLOW_ALL_BAGGAGE_KEYS, BaggageSpanProcessor +from .log_processor import BaggageLogProcessor from .version import __version__ - -__all__ = ["ALLOW_ALL_BAGGAGE_KEYS", "BaggageSpanProcessor", "__version__"] +__all__ = ["ALLOW_ALL_BAGGAGE_KEYS", "BaggageSpanProcessor", "BaggageLogProcessor", "__version__"] \ No newline at end of file diff --git a/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/log_processor.py b/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/log_processor.py new file mode 100644 index 0000000000..d9fa7676c1 --- /dev/null +++ b/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/log_processor.py @@ -0,0 +1,81 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Sequence, Union + +from opentelemetry.baggage import get_all as get_all_baggage +from opentelemetry.processor.baggage.processor import BaggageKeyPredicateT +from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord + +BaggageKeyPredicate = Union[ + BaggageKeyPredicateT, Sequence[BaggageKeyPredicateT] +] + + +class BaggageLogProcessor(LogRecordProcessor): + """ + The BaggageLogProcessor reads entries stored in Baggage + from the current context and adds the baggage entries' keys and + values to the log record as attributes on emit. + + Add this log processor to a logger provider. + + ⚠ Warning ⚠️ + + Do not put sensitive information in Baggage. + + To repeat: a consequence of adding data to Baggage is that the keys and + values will appear in all outgoing HTTP headers from the application. + """ + + def __init__( + self, + baggage_key_predicate: BaggageKeyPredicate, + max_baggage_attributes: int = 128, + ) -> None: + if callable(baggage_key_predicate): + self._predicates = [baggage_key_predicate] + else: + self._predicates = list(baggage_key_predicate) + self._max_baggage_attributes = max_baggage_attributes + + def _matches(self, key: str) -> bool: + return any(predicate(key) for predicate in self._predicates) + + def on_emit(self, log_record: ReadWriteLogRecord) -> None: + """Add baggage entries as log record attributes on emit. + + Baggage keys are filtered using the provided predicate(s). + If a baggage key already exists in the log record attributes, + it will not be overwritten to avoid collisions with attributes + added by stdlib logging, calls to logging.emit, or custom + LogRecordProcessors. At most max_baggage_attributes baggage + entries will be added. + """ + baggage = get_all_baggage() + count = 0 + for key, value in baggage.items(): + if count >= self._max_baggage_attributes: + break + if self._matches(key): + if key not in log_record.log_record.attributes: + log_record.log_record.attributes[key] = value + count += 1 + + def shutdown(self) -> None: + pass + + @staticmethod + def force_flush(timeout_millis: int = 30000) -> bool: + return True diff --git a/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/processor.py b/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/processor.py index 7e09e591e0..dd3e2a0e53 100644 --- a/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/processor.py +++ b/processor/opentelemetry-processor-baggage/src/opentelemetry/processor/baggage/processor.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Optional +from typing import Callable, Optional, Sequence, Union from opentelemetry.baggage import get_all as get_all_baggage from opentelemetry.context import Context @@ -21,6 +21,9 @@ # A BaggageKeyPredicate is a function that takes a baggage key and returns a boolean BaggageKeyPredicateT = Callable[[str], bool] +BaggageKeyPredicates = Union[ + BaggageKeyPredicateT, Sequence[BaggageKeyPredicateT] +] # A BaggageKeyPredicate that always returns True, allowing all baggage keys to be added to spans ALLOW_ALL_BAGGAGE_KEYS: BaggageKeyPredicateT = lambda _: True # noqa: E731 @@ -50,13 +53,16 @@ class BaggageSpanProcessor(SpanProcessor): """ - def __init__(self, baggage_key_predicate: BaggageKeyPredicateT) -> None: - self._baggage_key_predicate = baggage_key_predicate + def __init__(self, baggage_key_predicate: BaggageKeyPredicates) -> None: + if callable(baggage_key_predicate): + self._predicates = [baggage_key_predicate] + else: + self._predicates = list(baggage_key_predicate) def on_start( self, span: "Span", parent_context: Optional[Context] = None ) -> None: baggage = get_all_baggage(parent_context) for key, value in baggage.items(): - if self._baggage_key_predicate(key): + if any(predicate(key) for predicate in self._predicates): span.set_attribute(key, value) diff --git a/processor/opentelemetry-processor-baggage/tests/test_baggage_log_processor.py b/processor/opentelemetry-processor-baggage/tests/test_baggage_log_processor.py new file mode 100644 index 0000000000..1b3f3f185e --- /dev/null +++ b/processor/opentelemetry-processor-baggage/tests/test_baggage_log_processor.py @@ -0,0 +1,163 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re +import unittest + +from opentelemetry.baggage import set_baggage +from opentelemetry.context import attach, detach +from opentelemetry.processor.baggage import ( + ALLOW_ALL_BAGGAGE_KEYS, + BaggageLogProcessor, +) +from opentelemetry.sdk._logs import LoggerProvider, LogRecordProcessor +from opentelemetry.sdk._logs.export import ( + InMemoryLogRecordExporter, + BatchLogRecordProcessor, +) + + +class BaggageLogProcessorTest(unittest.TestCase): + def setUp(self): + self.exporter = InMemoryLogRecordExporter() + self.logger_provider = LoggerProvider() + self.logger_provider.add_log_record_processor( + BaggageLogProcessor(ALLOW_ALL_BAGGAGE_KEYS) + ) + self.logger_provider.add_log_record_processor( + BatchLogRecordProcessor(self.exporter) + ) + self.logger = self.logger_provider.get_logger("test-logger") + + def _get_attributes(self): + self.logger_provider.force_flush() + logs = self.exporter.get_finished_logs() + self.assertTrue(len(logs) > 0) + return logs[-1].log_record.attributes + + def test_check_the_baggage(self): + self.assertIsInstance( + BaggageLogProcessor(ALLOW_ALL_BAGGAGE_KEYS), LogRecordProcessor + ) + + def test_baggage_added_to_log_record(self): + token = attach(set_baggage("queen", "bee")) + self.logger.emit(None) + attributes = self._get_attributes() + self.assertEqual(attributes.get("queen"), "bee") + detach(token) + + def test_baggage_with_prefix(self): + token = attach(set_baggage("queen", "bee")) + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + BaggageLogProcessor(lambda key: key.startswith("que")) + ) + exporter = InMemoryLogRecordExporter() + logger_provider.add_log_record_processor( + BatchLogRecordProcessor(exporter) + ) + logger = logger_provider.get_logger("test-logger") + logger.emit(None) + logger_provider.force_flush() + logs = exporter.get_finished_logs() + attributes = logs[-1].log_record.attributes + self.assertEqual(attributes.get("queen"), "bee") + detach(token) + + def test_baggage_with_regex(self): + token = attach(set_baggage("queen", "bee")) + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + BaggageLogProcessor( + lambda key: re.match(r"que.*", key) is not None + ) + ) + exporter = InMemoryLogRecordExporter() + logger_provider.add_log_record_processor( + BatchLogRecordProcessor(exporter) + ) + logger = logger_provider.get_logger("test-logger") + logger.emit(None) + logger_provider.force_flush() + logs = exporter.get_finished_logs() + attributes = logs[-1].log_record.attributes + self.assertEqual(attributes.get("queen"), "bee") + detach(token) + + def test_no_baggage_not_added(self): + self.logger.emit(None) + self.logger_provider.force_flush() + logs = self.exporter.get_finished_logs() + self.assertTrue(len(logs) > 0) + attributes = logs[-1].log_record.attributes + self.assertNotIn("queen", attributes) + + def test_multiple_predicates(self): + token1 = attach(set_baggage("queen", "bee")) + token2 = attach(set_baggage("king", "cobra")) + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + BaggageLogProcessor([ + lambda key: key.startswith("que"), + lambda key: key.startswith("kin"), + ]) + ) + exporter = InMemoryLogRecordExporter() + logger_provider.add_log_record_processor( + BatchLogRecordProcessor(exporter) + ) + logger = logger_provider.get_logger("test-logger") + logger.emit(None) + logger_provider.force_flush() + logs = exporter.get_finished_logs() + attributes = logs[-1].log_record.attributes + self.assertEqual(attributes.get("queen"), "bee") + self.assertEqual(attributes.get("king"), "cobra") + detach(token2) + detach(token1) + + def test_max_baggage_attributes_limit(self): + token1 = attach(set_baggage("key1", "val1")) + token2 = attach(set_baggage("key2", "val2")) + token3 = attach(set_baggage("key3", "val3")) + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + BaggageLogProcessor(ALLOW_ALL_BAGGAGE_KEYS, max_baggage_attributes=2) + ) + exporter = InMemoryLogRecordExporter() + logger_provider.add_log_record_processor( + BatchLogRecordProcessor(exporter) + ) + logger = logger_provider.get_logger("test-logger") + logger.emit(None) + logger_provider.force_flush() + logs = exporter.get_finished_logs() + attributes = logs[-1].log_record.attributes + self.assertEqual(len(attributes), 2) + detach(token3) + detach(token2) + detach(token1) + + @staticmethod + def has_prefix(baggage_key: str) -> bool: + return baggage_key.startswith("que") + + @staticmethod + def matches_regex(baggage_key: str) -> bool: + return re.match(r"que.*", baggage_key) is not None + + +if __name__ == "__main__": + unittest.main()