Skip to content

[confluent-kafka] Dead code: extract_bootstrap_servers is never called, missing broker address in spans #4104

@Tetrahedrite

Description

@Tetrahedrite

Describe the bug

The KafkaPropertiesExtractor.extract_bootstrap_servers() method exists in utils.py but is never called anywhere in the instrumentation. As a result, Kafka spans are missing broker address attributes (messaging.url, server.address, server.port).

Steps to reproduce

  1. Instrument a Python application using opentelemetry-instrumentation-confluent-kafka
  2. Produce or consume messages to/from Kafka
  3. Inspect the generated spans

Expected behavior

Kafka spans should include broker address information:

  • messaging.url with the full bootstrap servers string
  • server.address with the primary broker host
  • server.port with the broker port

Actual behavior

Kafka spans only contain:

  • messaging.system
  • messaging.destination
  • messaging.kafka.partition
  • messaging.destination_kind
  • messaging.operation

No broker address information is available.

Evidence

In utils.py, the KafkaPropertiesExtractor class has:

class KafkaPropertiesExtractor:
    @staticmethod
    def extract_bootstrap_servers(instance):
        return instance.config.get("bootstrap_servers")  # Never called!

But this method is never invoked in __init__.py where spans are created.

Trace Data Example

Here's an actual Kafka span from a production environment:

{
  "messaging.system": "kafka",
  "messaging.destination": "backend_log",
  "messaging.kafka.partition": 0,
  "messaging.destination_kind": "queue",
  "messaging.operation": "receive"
}

The broker address (e.g., b-1.example-kafka.amazonaws.com:9092) is completely missing.

Impact

  • Service dependency graphs (e.g., Grafana Service Map with Tempo) cannot show Kafka broker endpoints
  • Multi-cluster Kafka environments cannot be distinguished in observability tools
  • Chaos engineering tools cannot determine which Kafka cluster is being targeted
  • Dependency analysis cannot determine external service addresses for Kafka

Proposed fix

  1. Pass bootstrap_servers to _enrich_span()
  2. Set messaging.url, server.address, and server.port attributes

Example fix in _enrich_span():

def _enrich_span(
    span,
    topic,
    partition: Optional[int] = None,
    offset: Optional[int] = None,
    operation: Optional[MessagingOperationValues] = None,
    bootstrap_servers: Optional[str] = None,  # Add parameter
):
    # ... existing code ...
    
    # Add: Set bootstrap servers attributes
    if bootstrap_servers:
        span.set_attribute(SpanAttributes.MESSAGING_URL, bootstrap_servers)
        first_broker = bootstrap_servers.split(",")[0].strip()
        if ":" in first_broker:
            host, port = first_broker.rsplit(":", 1)
            span.set_attribute(SpanAttributes.SERVER_ADDRESS, host)
            span.set_attribute(SpanAttributes.SERVER_PORT, int(port))
        else:
            span.set_attribute(SpanAttributes.SERVER_ADDRESS, first_broker)

And call extract_bootstrap_servers() in wrap_produce(), wrap_poll(), wrap_consume().

Environment

  • opentelemetry-instrumentation-confluent-kafka version: 0.50b0 (also verified in v0.60b1)
  • Python version: 3.11
  • confluent-kafka version: 2.x

Would you like a PR?

I'm happy to submit a PR to fix this issue if the approach sounds reasonable.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions