Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,34 @@ def extract_send_headers(args, kwargs):

@staticmethod
def extract_send_partition(instance, args, kwargs):
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class"""
"""Extract partition from `send` method arguments.

Only returns a partition value when it can be determined deterministically:
- When partition is explicitly provided by the caller
- When a key is provided (partition is determined by key hash)

Returns None when partition would be randomly assigned by the
DefaultPartitioner, since calling _partition() here would produce a
different result than the actual send() call.
"""
partition = KafkaPropertiesExtractor._extract_argument(
"partition", 4, None, args, kwargs
)
# If partition is explicitly set, return it directly
if partition is not None:
return partition

key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
# If no key is provided and no explicit partition, the DefaultPartitioner
# will randomly select a partition. We cannot predict which one, so
# return None to avoid recording an incorrect value.
if key is None:
return None

# Key is provided — partition is deterministic (hash-based), safe to compute
try:
topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs)
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
partition = KafkaPropertiesExtractor._extract_argument(
"partition", 4, None, args, kwargs
)
key_bytes = instance._serialize(
instance.config["key_serializer"], topic, key
)
Expand Down Expand Up @@ -131,7 +151,10 @@ def _enrich_span(
if span.is_recording():
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic)
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition)
if partition is not None:
span.set_attribute(
SpanAttributes.MESSAGING_KAFKA_PARTITION, partition
)
span.set_attribute(
SpanAttributes.MESSAGING_URL, json.dumps(bootstrap_servers)
)
Expand Down
Loading