diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py index feffa34098..fcf6bbb7e8 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/__init__.py @@ -36,6 +36,7 @@ def loader(): "stepfunctions": _lazy_load(".sfns", "_StepFunctionsExtension"), "sns": _lazy_load(".sns", "_SnsExtension"), "sqs": _lazy_load(".sqs", "_SqsExtension"), + "kinesis": _lazy_load(".kinesis", "_KinesisExtension"), } _AIOBOTOCORE_EXTENSIONS = { diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/kinesis.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/kinesis.py new file mode 100644 index 0000000000..ad51fafe37 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/kinesis.py @@ -0,0 +1,172 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import json +import logging +import os +from typing import Any, Dict, MutableMapping + +from opentelemetry.propagate import inject +from opentelemetry.instrumentation.botocore.extensions.types import ( + _AttributeMapT, + _AwsSdkCallContext, + _AwsSdkExtension, + _BotocoreInstrumentorContext, +) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import Span + +_logger = logging.getLogger(__name__) + +################################################################################ +# Kinesis operations +################################################################################ + + +class _KinesisOperation(abc.ABC): + @classmethod + @abc.abstractmethod + def operation_name(cls) -> str: + pass + + @classmethod + def span_kind(cls) -> SpanKind: + return SpanKind.CLIENT + + @classmethod + def extract_attributes( + cls, call_context: _AwsSdkCallContext, attributes: _AttributeMapT + ): + pass + + @classmethod + def before_service_call(cls, call_context: _AwsSdkCallContext, span: Span): + pass + + +class _OpPutRecord(_KinesisOperation): + @classmethod + def operation_name(cls) -> str: + return "PutRecord" + + @classmethod + def span_kind(cls) -> SpanKind: + return SpanKind.PRODUCER + + @classmethod + def _extract_stream_name(cls, call_context: _AwsSdkCallContext) -> str: + stream_name = call_context.params.get("StreamName") + if stream_name: + return stream_name + + stream_arn = call_context.params.get("StreamARN", "") + if "/" in stream_arn: + return stream_arn.split("/", 1)[-1] + + return "unknown" + + @classmethod + def extract_attributes( + cls, call_context: _AwsSdkCallContext, attributes: _AttributeMapT + ): + stream_name = cls._extract_stream_name(call_context) + call_context.span_name = f"{stream_name} send" + attributes[SpanAttributes.MESSAGING_DESTINATION_NAME] = stream_name + + @classmethod + def before_service_call(cls, call_context: _AwsSdkCallContext, span: Span): + cls._inject_span_into_entry(call_context.params) + + @classmethod + def _inject_span_into_entry(cls, entry: MutableMapping[str, Any]): + """Inject trace context into the Data JSON payload.""" + data = entry.get("Data") + if data is None: + return + use_bytes = False + try: + if isinstance(data, bytes): + data_str = data.decode("utf-8") + use_bytes = True + else: + data_str = data + data_dict = json.loads(data_str) + except (json.JSONDecodeError, TypeError, UnicodeDecodeError): + _logger.debug( + "botocore instrumentation: failed to parse Kinesis Data as JSON" + ) + return + + inject(data_dict) + data_dump = json.dumps(data_dict) + data_bytes = data_dump.encode("utf-8") + if len(data_bytes) > 1_048_576: + _logger.debug( + "botocore instrumentation: skipping Kinesis context injection, " + "record would exceed 1MB limit" + ) + return + entry["Data"] = data_bytes if use_bytes else data_dump + + +class _OpPutRecords(_OpPutRecord): + @classmethod + def operation_name(cls) -> str: + return "PutRecords" + + @classmethod + def before_service_call(cls, call_context: _AwsSdkCallContext, span: Span): + for record in call_context.params.get("Records", []): + cls._inject_span_into_entry(record) + + +################################################################################ +# Kinesis extension +################################################################################ + +_OPERATION_MAPPING: Dict[str, _KinesisOperation] = { + "PutRecord": _OpPutRecord, + "PutRecords": _OpPutRecords, +} + + +_ENABLED = os.environ.get("ENABLE_KINESIS_INSTRUMENTATION", "").lower() == "true" + + +class _KinesisExtension(_AwsSdkExtension): + def __init__(self, call_context: _AwsSdkCallContext): + super().__init__(call_context) + if not _ENABLED: + self._op = None + return + self._op = _OPERATION_MAPPING.get(call_context.operation) + if self._op: + call_context.span_kind = self._op.span_kind() + + def extract_attributes(self, attributes: _AttributeMapT): + if not _ENABLED: + return + + attributes[SpanAttributes.MESSAGING_SYSTEM] = "aws.kinesis" + + if self._op: + self._op.extract_attributes(self._call_context, attributes) + + def before_service_call( + self, span: Span, instrumentor_context: _BotocoreInstrumentorContext + ): + if self._op: + self._op.before_service_call(self._call_context, span) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_kinesis.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_kinesis.py new file mode 100644 index 0000000000..5e59720894 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_kinesis.py @@ -0,0 +1,266 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import json +from typing import Any, Dict +from unittest import mock + +import botocore.session +from botocore.awsrequest import AWSResponse + +from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import SpanKind +from opentelemetry.trace.span import Span + + +class TestKinesisExtension(TestBase): + def setUp(self): + super().setUp() + self._enabled_patcher = mock.patch( + "opentelemetry.instrumentation.botocore.extensions.kinesis._ENABLED", + True, + ) + self._enabled_patcher.start() + BotocoreInstrumentor().instrument() + + session = botocore.session.get_session() + session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + self.client = session.create_client( + "kinesis", region_name="us-west-2" + ) + self.stream_name = "my-stream" + + def tearDown(self): + super().tearDown() + BotocoreInstrumentor().uninstrument() + self._enabled_patcher.stop() + + @contextlib.contextmanager + def _mocked_aws_endpoint(self, response): + response_func = self._make_aws_response_func(response) + with mock.patch( + "botocore.endpoint.Endpoint.make_request", new=response_func + ): + yield + + @staticmethod + def _make_aws_response_func(response): + def _response_func(*args, **kwargs): + return AWSResponse("http://127.0.0.1", 200, {}, "{}"), response + + return _response_func + + def assert_span(self, name: str) -> Span: + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + span = spans[0] + + self.assertEqual(SpanKind.PRODUCER, span.kind) + self.assertEqual(name, span.name) + self.assertEqual( + "aws.kinesis", span.attributes[SpanAttributes.MESSAGING_SYSTEM] + ) + + return span + + def assert_injected_span(self, data_dict: Dict[str, Any], span: Span): + trace_parent = data_dict["traceparent"].split("-") + span_context = span.get_span_context() + + self.assertEqual(span_context.trace_id, int(trace_parent[1], 16)) + self.assertEqual(span_context.span_id, int(trace_parent[2], 16)) + + def test_put_record_injects_span(self): + mock_response = { + "ShardId": "shardId-000000000000", + "SequenceNumber": "12345", + } + + with self._mocked_aws_endpoint(mock_response): + self.client.put_record( + StreamName=self.stream_name, + Data=json.dumps({"key": "value"}).encode("utf-8"), + PartitionKey="pk", + ) + + span = self.assert_span(f"{self.stream_name} send") + self.assertEqual( + self.stream_name, + span.attributes[SpanAttributes.MESSAGING_DESTINATION_NAME], + ) + + def test_put_records_injects_span(self): + mock_response = { + "FailedRecordCount": 0, + "Records": [ + {"ShardId": "shardId-000000000000", "SequenceNumber": "1"}, + {"ShardId": "shardId-000000000000", "SequenceNumber": "2"}, + ], + } + + records = [ + { + "Data": json.dumps({"key1": "value1"}).encode("utf-8"), + "PartitionKey": "pk1", + }, + { + "Data": json.dumps({"key2": "value2"}).encode("utf-8"), + "PartitionKey": "pk2", + }, + ] + + with self._mocked_aws_endpoint(mock_response): + self.client.put_records( + StreamName=self.stream_name, + Records=records, + ) + + span = self.assert_span(f"{self.stream_name} send") + self.assertEqual( + self.stream_name, + span.attributes[SpanAttributes.MESSAGING_DESTINATION_NAME], + ) + + # Verify traceparent was injected into each record's Data + for record in records: + data = record["Data"] + if isinstance(data, bytes): + data = data.decode("utf-8") + data_dict = json.loads(data) + self.assert_injected_span(data_dict, span) + + def test_put_record_stream_arn(self): + mock_response = { + "ShardId": "shardId-000000000000", + "SequenceNumber": "12345", + } + + data = json.dumps({"key": "value"}).encode("utf-8") + + with self._mocked_aws_endpoint(mock_response): + self.client.put_record( + StreamARN="arn:aws:kinesis:us-west-2:123456789012:stream/my-arn-stream", + Data=data, + PartitionKey="pk", + ) + + span = self.assert_span("my-arn-stream send") + self.assertEqual( + "my-arn-stream", + span.attributes[SpanAttributes.MESSAGING_DESTINATION_NAME], + ) + + def test_put_record_non_json_data(self): + mock_response = { + "ShardId": "shardId-000000000000", + "SequenceNumber": "12345", + } + + original_data = b"this is not json" + + with self._mocked_aws_endpoint(mock_response): + self.client.put_record( + StreamName=self.stream_name, + Data=original_data, + PartitionKey="pk", + ) + + # Should still create a span without crashing + span = self.assert_span(f"{self.stream_name} send") + self.assertIsNotNone(span) + + def test_put_record_skips_injection_when_exceeds_1mb(self): + mock_response = { + "ShardId": "shardId-000000000000", + "SequenceNumber": "12345", + } + + # Create a JSON payload just under 1MB so that adding trace context pushes it over + padding = "x" * (1_048_576 - 20) + original_data = json.dumps({"key": padding}).encode("utf-8") + + with self._mocked_aws_endpoint(mock_response): + self.client.put_record( + StreamName=self.stream_name, + Data=original_data, + PartitionKey="pk", + ) + + # Span should still be created + span = self.assert_span(f"{self.stream_name} send") + self.assertIsNotNone(span) + + # Data should remain unchanged — no traceparent injected + # The original_data was passed by reference via the params dict, + # but since injection was skipped, re-parse to confirm no traceparent + data_dict = json.loads(original_data.decode("utf-8")) + self.assertNotIn("traceparent", data_dict) + + def test_put_record_empty_data(self): + mock_response = { + "ShardId": "shardId-000000000000", + "SequenceNumber": "12345", + } + + with self._mocked_aws_endpoint(mock_response): + self.client.put_record( + StreamName=self.stream_name, + Data=b"", + PartitionKey="pk", + ) + + # Should still create a span without crashing + span = self.assert_span(f"{self.stream_name} send") + self.assertIsNotNone(span) + + @mock.patch( + "opentelemetry.instrumentation.botocore.extensions.kinesis._ENABLED", + False, + ) + def test_disabled_when_env_var_not_set(self): + mock_response = { + "ShardId": "shardId-000000000000", + "SequenceNumber": "12345", + } + + data = json.dumps({"key": "value"}).encode("utf-8") + + with self._mocked_aws_endpoint(mock_response): + self.client.put_record( + StreamName=self.stream_name, + Data=data, + PartitionKey="pk", + ) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(1, len(spans)) + span = spans[0] + + # Should be a plain CLIENT span with no messaging attributes + self.assertEqual(SpanKind.CLIENT, span.kind) + self.assertNotIn( + SpanAttributes.MESSAGING_SYSTEM, span.attributes + ) + self.assertNotIn( + SpanAttributes.MESSAGING_DESTINATION_NAME, span.attributes + ) + + # Data should not have traceparent injected + data_dict = json.loads(data.decode("utf-8")) + self.assertNotIn("traceparent", data_dict)