Describe your environment
OS: Tahoe 26.3
Python version: 3.13.9
Package version: 0.63b0.dev
What happened?
Before Kafka producer sends data, the instrumentation creates a producer span. When DefaultPartitioner is used, key is not set, and a partition is not explicitly set in the producer, I found that opentelemetry-instrumentation-kafka-python records incorrect Kafka partition in the span.
The instrumentation extracts a partition using instance._partition and assigns it to the messaging.kafka.partition attribute. But this _partition method randomly chooses one partition from partition candidates. This partition is not the actual partition where the data is sent, but only used for span recording.
After that, _partition is called again inside the Kafka producer send method, and this time the selected partition is the actual partition where the data is sent.
Because of this, the partition recorded in the span can be different from the actual partition used by the producer.
Steps to Reproduce
Reproducible example:
# kafka_test.py
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from opentelemetry import trace
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter, ConsoleSpanExporter
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
TOPIC = "test-topic"
NUM_PARTITIONS = 12
admin = KafkaAdminClient(bootstrap_servers="localhost:9092")
try:
admin.create_topics([NewTopic(TOPIC, num_partitions=NUM_PARTITIONS, replication_factor=1)])
except Exception:
pass
def produce_hook(span, args, kwargs):
if span and span.is_recording():
value = kwargs.get("value") if "value" in kwargs else (args[1] if len(args) > 1 else None)
if value is not None:
span.set_attribute(
"messaging.kafka.message.value",
value.decode() if isinstance(value, (bytes, bytearray)) else str(value),
)
key = kwargs.get("key") if "key" in kwargs else (args[2] if len(args) > 2 else None)
if key is not None:
span.set_attribute(
"messaging.kafka.message.key",
key.decode() if isinstance(key, (bytes, bytearray)) else str(key),
)
KafkaInstrumentor().instrument(produce_hook=produce_hook)
producer = KafkaProducer(bootstrap_servers="localhost:9092")
N = 10
for i in range(N):
fut = producer.send(TOPIC, value=f"v{i}".encode())
actual = fut.get(timeout=10).partition
Expected Result
messaging.kafka.partition in the code result and messaging.kafka.partition in kafka-console-consumer result should be the same
Actual Result
kafka_test.py result:
python kafka_test.py | jq -r '
"messaging.kafka.message.value : \(.attributes."messaging.kafka.message.value")",
"messaging.kafka.partition : \(.attributes."messaging.kafka.partition")",
"------------------------------------"
'
messaging.kafka.message.value : v0
messaging.kafka.partition : null (should be 0)
------------------------------------
messaging.kafka.message.value : v1
messaging.kafka.partition : 6
------------------------------------
messaging.kafka.message.value : v2
messaging.kafka.partition : 11
------------------------------------
messaging.kafka.message.value : v3
messaging.kafka.partition : 3 (should be 9)
------------------------------------
messaging.kafka.message.value : v4
messaging.kafka.partition : 7 (should be 10)
------------------------------------
messaging.kafka.message.value : v5
messaging.kafka.partition : 4
------------------------------------
messaging.kafka.message.value : v6
messaging.kafka.partition : 4 (should be 7)
------------------------------------
messaging.kafka.message.value : v7
messaging.kafka.partition : 10 (should be 9)
------------------------------------
messaging.kafka.message.value : v8
messaging.kafka.partition : 7 (should be 9)
------------------------------------
messaging.kafka.message.value : v9
messaging.kafka.partition : 9
------------------------------------
kafka-console-consumer result:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test-topic \
--property print.partition=true
Partition:0 v0
Partition:6 v1
Partition:11 v2
Partition:9 v3
Partition:10 v4
Partition:4 v5
Partition:7 v6
Partition:9 v7
Partition:9 v8
Partition:9 v9
Additional context
No response
Would you like to implement a fix?
None
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.
Describe your environment
OS: Tahoe 26.3
Python version: 3.13.9
Package version: 0.63b0.dev
What happened?
Before Kafka producer sends data, the instrumentation creates a producer span. When
DefaultPartitioneris used, key is not set, and a partition is not explicitly set in the producer, I found that opentelemetry-instrumentation-kafka-python records incorrect Kafka partition in the span.The instrumentation extracts a partition using
instance._partitionand assigns it to themessaging.kafka.partitionattribute. But this_partitionmethod randomly chooses one partition from partition candidates. This partition is not the actual partition where the data is sent, but only used for span recording.After that,
_partitionis called again inside the Kafka producersendmethod, and this time the selected partition is the actual partition where the data is sent.Because of this, the partition recorded in the span can be different from the actual partition used by the producer.
Steps to Reproduce
Reproducible example:
Expected Result
messaging.kafka.partitionin the code result andmessaging.kafka.partitionin kafka-console-consumer result should be the sameActual Result
kafka_test.py result:
kafka-console-consumer result:
Additional context
No response
Would you like to implement a fix?
None
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding
+1orme too, to help us triage it. Learn more here.