From 408adce4044406be309b2d627cd23734df700bef Mon Sep 17 00:00:00 2001 From: gauravSsinha Date: Tue, 26 May 2026 01:34:04 -0500 Subject: [PATCH] fix(kafka): avoid recording incorrect partition when using random partitioner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When no explicit partition and no key are provided, the DefaultPartitioner randomly selects a partition. The instrumentation was calling _partition() to record this value in the span, but the actual send() method calls _partition() again independently, potentially selecting a different partition. This caused the span's messaging.kafka.partition attribute to not match the actual partition where the message was delivered. Fix: only compute partition in the instrumentation when the result is deterministic — either explicitly provided by the caller or determined by key hash. When partition would be randomly assigned, omit the attribute rather than record an incorrect value. Fixes open-telemetry/opentelemetry-python-contrib#4625 Signed-off-by: Gaurav Kumar Sinha --- .../instrumentation/kafka/utils.py | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py index 00da544325..be740bd8be 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py @@ -57,14 +57,34 @@ def extract_send_headers(args, kwargs): @staticmethod def extract_send_partition(instance, args, kwargs): - """extract partition `send` method arguments, using the `_partition` method in KafkaProducer class""" + """Extract partition from `send` method arguments. + + Only returns a partition value when it can be determined deterministically: + - When partition is explicitly provided by the caller + - When a key is provided (partition is determined by key hash) + + Returns None when partition would be randomly assigned by the + DefaultPartitioner, since calling _partition() here would produce a + different result than the actual send() call. + """ + partition = KafkaPropertiesExtractor._extract_argument( + "partition", 4, None, args, kwargs + ) + # If partition is explicitly set, return it directly + if partition is not None: + return partition + + key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) + # If no key is provided and no explicit partition, the DefaultPartitioner + # will randomly select a partition. We cannot predict which one, so + # return None to avoid recording an incorrect value. + if key is None: + return None + + # Key is provided — partition is deterministic (hash-based), safe to compute try: topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) - key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) - partition = KafkaPropertiesExtractor._extract_argument( - "partition", 4, None, args, kwargs - ) key_bytes = instance._serialize( instance.config["key_serializer"], topic, key ) @@ -131,7 +151,10 @@ def _enrich_span( if span.is_recording(): span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) - span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) + if partition is not None: + span.set_attribute( + SpanAttributes.MESSAGING_KAFKA_PARTITION, partition + ) span.set_attribute( SpanAttributes.MESSAGING_URL, json.dumps(bootstrap_servers) )