Describe the bug
The KafkaPropertiesExtractor.extract_bootstrap_servers() method exists in utils.py but is never called anywhere in the instrumentation. As a result, Kafka spans are missing broker address attributes (messaging.url, server.address, server.port).
Steps to reproduce
- Instrument a Python application using
opentelemetry-instrumentation-confluent-kafka
- Produce or consume messages to/from Kafka
- Inspect the generated spans
Expected behavior
Kafka spans should include broker address information:
messaging.url with the full bootstrap servers string
server.address with the primary broker host
server.port with the broker port
Actual behavior
Kafka spans only contain:
messaging.system
messaging.destination
messaging.kafka.partition
messaging.destination_kind
messaging.operation
No broker address information is available.
Evidence
In utils.py, the KafkaPropertiesExtractor class has:
class KafkaPropertiesExtractor:
@staticmethod
def extract_bootstrap_servers(instance):
return instance.config.get("bootstrap_servers") # Never called!
But this method is never invoked in __init__.py where spans are created.
Trace Data Example
Here's an actual Kafka span from a production environment:
{
"messaging.system": "kafka",
"messaging.destination": "backend_log",
"messaging.kafka.partition": 0,
"messaging.destination_kind": "queue",
"messaging.operation": "receive"
}
The broker address (e.g., b-1.example-kafka.amazonaws.com:9092) is completely missing.
Impact
- Service dependency graphs (e.g., Grafana Service Map with Tempo) cannot show Kafka broker endpoints
- Multi-cluster Kafka environments cannot be distinguished in observability tools
- Chaos engineering tools cannot determine which Kafka cluster is being targeted
- Dependency analysis cannot determine external service addresses for Kafka
Proposed fix
- Pass
bootstrap_servers to _enrich_span()
- Set
messaging.url, server.address, and server.port attributes
Example fix in _enrich_span():
def _enrich_span(
span,
topic,
partition: Optional[int] = None,
offset: Optional[int] = None,
operation: Optional[MessagingOperationValues] = None,
bootstrap_servers: Optional[str] = None, # Add parameter
):
# ... existing code ...
# Add: Set bootstrap servers attributes
if bootstrap_servers:
span.set_attribute(SpanAttributes.MESSAGING_URL, bootstrap_servers)
first_broker = bootstrap_servers.split(",")[0].strip()
if ":" in first_broker:
host, port = first_broker.rsplit(":", 1)
span.set_attribute(SpanAttributes.SERVER_ADDRESS, host)
span.set_attribute(SpanAttributes.SERVER_PORT, int(port))
else:
span.set_attribute(SpanAttributes.SERVER_ADDRESS, first_broker)
And call extract_bootstrap_servers() in wrap_produce(), wrap_poll(), wrap_consume().
Environment
opentelemetry-instrumentation-confluent-kafka version: 0.50b0 (also verified in v0.60b1)
- Python version: 3.11
- confluent-kafka version: 2.x
Would you like a PR?
I'm happy to submit a PR to fix this issue if the approach sounds reasonable.
Describe the bug
The
KafkaPropertiesExtractor.extract_bootstrap_servers()method exists inutils.pybut is never called anywhere in the instrumentation. As a result, Kafka spans are missing broker address attributes (messaging.url,server.address,server.port).Steps to reproduce
opentelemetry-instrumentation-confluent-kafkaExpected behavior
Kafka spans should include broker address information:
messaging.urlwith the full bootstrap servers stringserver.addresswith the primary broker hostserver.portwith the broker portActual behavior
Kafka spans only contain:
messaging.systemmessaging.destinationmessaging.kafka.partitionmessaging.destination_kindmessaging.operationNo broker address information is available.
Evidence
In
utils.py, theKafkaPropertiesExtractorclass has:But this method is never invoked in
__init__.pywhere spans are created.Trace Data Example
Here's an actual Kafka span from a production environment:
{ "messaging.system": "kafka", "messaging.destination": "backend_log", "messaging.kafka.partition": 0, "messaging.destination_kind": "queue", "messaging.operation": "receive" }The broker address (e.g.,
b-1.example-kafka.amazonaws.com:9092) is completely missing.Impact
Proposed fix
bootstrap_serversto_enrich_span()messaging.url,server.address, andserver.portattributesExample fix in
_enrich_span():And call
extract_bootstrap_servers()inwrap_produce(),wrap_poll(),wrap_consume().Environment
opentelemetry-instrumentation-confluent-kafkaversion: 0.50b0 (also verified in v0.60b1)Would you like a PR?
I'm happy to submit a PR to fix this issue if the approach sounds reasonable.