diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py index 00da544325..be740bd8be 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py @@ -57,14 +57,34 @@ def extract_send_headers(args, kwargs): @staticmethod def extract_send_partition(instance, args, kwargs): - """extract partition `send` method arguments, using the `_partition` method in KafkaProducer class""" + """Extract partition from `send` method arguments. + + Only returns a partition value when it can be determined deterministically: + - When partition is explicitly provided by the caller + - When a key is provided (partition is determined by key hash) + + Returns None when partition would be randomly assigned by the + DefaultPartitioner, since calling _partition() here would produce a + different result than the actual send() call. + """ + partition = KafkaPropertiesExtractor._extract_argument( + "partition", 4, None, args, kwargs + ) + # If partition is explicitly set, return it directly + if partition is not None: + return partition + + key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) + # If no key is provided and no explicit partition, the DefaultPartitioner + # will randomly select a partition. We cannot predict which one, so + # return None to avoid recording an incorrect value. + if key is None: + return None + + # Key is provided — partition is deterministic (hash-based), safe to compute try: topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) - key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) - partition = KafkaPropertiesExtractor._extract_argument( - "partition", 4, None, args, kwargs - ) key_bytes = instance._serialize( instance.config["key_serializer"], topic, key ) @@ -131,7 +151,10 @@ def _enrich_span( if span.is_recording(): span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) - span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) + if partition is not None: + span.set_attribute( + SpanAttributes.MESSAGING_KAFKA_PARTITION, partition + ) span.set_attribute( SpanAttributes.MESSAGING_URL, json.dumps(bootstrap_servers) )