Skip to content

[confluent-kafka] Dead code: extract_bootstrap_servers is never called, missing broker address in spans #4104

@Tetrahedrite

Description

@Tetrahedrite

Describe the bug

The KafkaPropertiesExtractor.extract_bootstrap_servers() method exists in utils.py but is never called anywhere in the instrumentation. As a result, Kafka spans are missing broker address attributes (messaging.url, server.address, server.port).

Steps to reproduce

  1. Instrument a Python application using opentelemetry-instrumentation-confluent-kafka
  2. Produce or consume messages to/from Kafka
  3. Inspect the generated spans

Expected behavior

Kafka spans should include broker address information:

  • messaging.url with the full bootstrap servers string
  • server.address with the primary broker host
  • server.port with the broker port

Actual behavior

Kafka spans only contain:

  • messaging.system
  • messaging.destination
  • messaging.kafka.partition
  • messaging.destination_kind
  • messaging.operation

No broker address information is available.

Evidence

In utils.py, the KafkaPropertiesExtractor class has:

class KafkaPropertiesExtractor:
    @staticmethod
    def extract_bootstrap_servers(instance):
        return instance.config.get("bootstrap_servers")  # Never called!

But this method is never invoked in __init__.py where spans are created.

Trace Data Example

Here's an actual Kafka span from a production environment:

{
  "messaging.system": "kafka",
  "messaging.destination": "backend_log",
  "messaging.kafka.partition": 0,
  "messaging.destination_kind": "queue",
  "messaging.operation": "receive"
}

The broker address (e.g., b-1.example-kafka.amazonaws.com:9092) is completely missing.

Impact

  • Service dependency graphs (e.g., Grafana Service Map with Tempo) cannot show Kafka broker endpoints
  • Multi-cluster Kafka environments cannot be distinguished in observability tools
  • Chaos engineering tools cannot determine which Kafka cluster is being targeted
  • Dependency analysis cannot determine external service addresses for Kafka

Proposed fix

  1. Pass bootstrap_servers to _enrich_span()
  2. Set messaging.url, server.address, and server.port attributes

Example fix in _enrich_span():

def _enrich_span(
    span,
    topic,
    partition: Optional[int] = None,
    offset: Optional[int] = None,
    operation: Optional[MessagingOperationValues] = None,
    bootstrap_servers: Optional[str] = None,  # Add parameter
):
    # ... existing code ...
    
    # Add: Set bootstrap servers attributes
    if bootstrap_servers:
        span.set_attribute(SpanAttributes.MESSAGING_URL, bootstrap_servers)
        first_broker = bootstrap_servers.split(",")[0].strip()
        if ":" in first_broker:
            host, port = first_broker.rsplit(":", 1)
            span.set_attribute(SpanAttributes.SERVER_ADDRESS, host)
            span.set_attribute(SpanAttributes.SERVER_PORT, int(port))
        else:
            span.set_attribute(SpanAttributes.SERVER_ADDRESS, first_broker)

And call extract_bootstrap_servers() in wrap_produce(), wrap_poll(), wrap_consume().

Environment

  • opentelemetry-instrumentation-confluent-kafka version: 0.50b0 (also verified in v0.60b1)
  • Python version: 3.11
  • confluent-kafka version: 2.x

Would you like a PR?

I'm happy to submit a PR to fix this issue if the approach sounds reasonable.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions