Skip to content

Incorrect partition in opentelemetry-instrumentation-kafka-python #4625

@mznet

Description

@mznet

Describe your environment

OS: Tahoe 26.3
Python version: 3.13.9
Package version: 0.63b0.dev

What happened?

Before Kafka producer sends data, the instrumentation creates a producer span. When DefaultPartitioner is used, key is not set, and a partition is not explicitly set in the producer, I found that opentelemetry-instrumentation-kafka-python records incorrect Kafka partition in the span.

The instrumentation extracts a partition using instance._partition and assigns it to the messaging.kafka.partition attribute. But this _partition method randomly chooses one partition from partition candidates. This partition is not the actual partition where the data is sent, but only used for span recording.

After that, _partition is called again inside the Kafka producer send method, and this time the selected partition is the actual partition where the data is sent.

Because of this, the partition recorded in the span can be different from the actual partition used by the producer.

Steps to Reproduce

Reproducible example:

# kafka_test.py

from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from opentelemetry import trace
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter, ConsoleSpanExporter


provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)

TOPIC = "test-topic"
NUM_PARTITIONS = 12
admin = KafkaAdminClient(bootstrap_servers="localhost:9092")
try:
    admin.create_topics([NewTopic(TOPIC, num_partitions=NUM_PARTITIONS, replication_factor=1)])
except Exception:
    pass

def produce_hook(span, args, kwargs):
    if span and span.is_recording():
        value = kwargs.get("value") if "value" in kwargs else (args[1] if len(args) > 1 else None)
        if value is not None:
            span.set_attribute(
                "messaging.kafka.message.value",
                value.decode() if isinstance(value, (bytes, bytearray)) else str(value),
            )
        key = kwargs.get("key") if "key" in kwargs else (args[2] if len(args) > 2 else None)
        if key is not None:
            span.set_attribute(
                "messaging.kafka.message.key",
                key.decode() if isinstance(key, (bytes, bytearray)) else str(key),
            )


KafkaInstrumentor().instrument(produce_hook=produce_hook)
producer = KafkaProducer(bootstrap_servers="localhost:9092")

N = 10

for i in range(N):
    fut = producer.send(TOPIC, value=f"v{i}".encode())
    actual = fut.get(timeout=10).partition

Expected Result

messaging.kafka.partition in the code result and messaging.kafka.partition in kafka-console-consumer result should be the same

Actual Result

kafka_test.py result:

python kafka_test.py | jq -r '
  "messaging.kafka.message.value : \(.attributes."messaging.kafka.message.value")",
  "messaging.kafka.partition : \(.attributes."messaging.kafka.partition")",
  "------------------------------------"
'

messaging.kafka.message.value : v0
messaging.kafka.partition : null (should be 0)
------------------------------------
messaging.kafka.message.value : v1
messaging.kafka.partition : 6 
------------------------------------
messaging.kafka.message.value : v2
messaging.kafka.partition : 11
------------------------------------
messaging.kafka.message.value : v3
messaging.kafka.partition : 3 (should be 9)
------------------------------------
messaging.kafka.message.value : v4
messaging.kafka.partition : 7 (should be 10)
------------------------------------
messaging.kafka.message.value : v5
messaging.kafka.partition : 4
------------------------------------
messaging.kafka.message.value : v6
messaging.kafka.partition : 4 (should be 7)
------------------------------------
messaging.kafka.message.value : v7
messaging.kafka.partition : 10 (should be 9)
------------------------------------
messaging.kafka.message.value : v8
messaging.kafka.partition : 7 (should be 9)
------------------------------------
messaging.kafka.message.value : v9
messaging.kafka.partition : 9
------------------------------------

kafka-console-consumer result:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic test-topic \
  --property print.partition=true

Partition:0	v0
Partition:6	v1
Partition:11	v2
Partition:9	v3
Partition:10	v4
Partition:4	v5
Partition:7	v6
Partition:9	v7
Partition:9	v8
Partition:9	v9

Additional context

No response

Would you like to implement a fix?

None

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions