Skip to content

Commit 3f3d1b6

Browse files
committed
fix(confluent-kafka): populate bootstrap.servers span attributes
KafkaPropertiesExtractor.extract_bootstrap_servers was defined but never called, so confluent-kafka spans were missing messaging.url, server.address, and server.port even though the extraction helper existed for it. - Capture the config dict in AutoInstrumentedProducer/Consumer and surface it on the Proxied{Producer,Consumer} wrappers. - extract_bootstrap_servers now accepts both the "bootstrap.servers" dotted key (confluent-kafka standard) and "bootstrap_servers", and safely handles list-valued configs and instances without a config attribute. - wrap_produce / wrap_poll / wrap_consume pull the config and pass it through _enrich_span, which sets messaging.url, server.address, and server.port (parsing host:port from the first broker). Closes #4104 Signed-off-by: alliasgher <alliasgher123@gmail.com>
1 parent a912524 commit 3f3d1b6

5 files changed

Lines changed: 142 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
1212
## Unreleased
1313

14+
### Fixed
15+
16+
- `opentelemetry-instrumentation-confluent-kafka`: Populate `messaging.url`, `server.address` and `server.port` span attributes from the producer/consumer `bootstrap.servers` config; previously `KafkaPropertiesExtractor.extract_bootstrap_servers` was defined but never called
17+
([#4104](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/4104))
18+
1419
### Breaking changes
1520

1621
- Drop Python 3.9 support

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,27 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
127127
from .version import __version__
128128

129129

130+
def _capture_config(args, kwargs):
131+
"""Return the config dict that was passed to a Producer/Consumer
132+
constructor, regardless of whether it was supplied positionally, as
133+
``conf=`` kwarg, or (for Consumer) expanded as **kwargs."""
134+
if args and isinstance(args[0], dict):
135+
return args[0]
136+
conf = kwargs.get("conf")
137+
if isinstance(conf, dict):
138+
return conf
139+
# confluent_kafka.Consumer also supports Consumer(**conf) — in that case
140+
# the kwargs themselves are the config.
141+
if kwargs:
142+
return dict(kwargs)
143+
return None
144+
145+
130146
class AutoInstrumentedProducer(Producer):
147+
def __init__(self, *args, **kwargs):
148+
super().__init__(*args, **kwargs)
149+
self.config = _capture_config(args, kwargs)
150+
131151
# This method is deliberately implemented in order to allow wrapt to wrap this function
132152
def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation
133153
super().produce(topic, value, *args, **kwargs)
@@ -136,6 +156,7 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor
136156
class AutoInstrumentedConsumer(Consumer):
137157
def __init__(self, *args, **kwargs):
138158
super().__init__(*args, **kwargs)
159+
self.config = _capture_config(args, kwargs)
139160
self._current_consume_span = None
140161

141162
# This method is deliberately implemented in order to allow wrapt to wrap this function
@@ -155,6 +176,10 @@ class ProxiedProducer(Producer):
155176
def __init__(self, producer: Producer, tracer: Tracer):
156177
self._producer = producer
157178
self._tracer = tracer
179+
# Surface the wrapped producer's config (if any) so that
180+
# KafkaPropertiesExtractor.extract_bootstrap_servers can read it
181+
# through this proxy.
182+
self.config = getattr(producer, "config", None)
158183

159184
def flush(self, timeout=-1):
160185
return self._producer.flush(timeout)
@@ -184,6 +209,8 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
184209
self._tracer = tracer
185210
self._current_consume_span = None
186211
self._current_context_token = None
212+
# See ProxiedProducer.__init__ for rationale.
213+
self.config = getattr(consumer, "config", None)
187214

188215
def close(self, *args, **kwargs):
189216
return ConfluentKafkaInstrumentor.wrap_close(
@@ -367,10 +394,14 @@ def wrap_produce(func, instance, tracer, args, kwargs):
367394
topic = KafkaPropertiesExtractor.extract_produce_topic(
368395
args, kwargs
369396
)
397+
bootstrap_servers = (
398+
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
399+
)
370400
_enrich_span(
371401
span,
372402
topic,
373403
operation=MessagingOperationTypeValues.RECEIVE,
404+
bootstrap_servers=bootstrap_servers,
374405
) # Replace
375406
propagate.inject(
376407
headers,
@@ -385,6 +416,9 @@ def wrap_poll(func, instance, tracer, args, kwargs):
385416

386417
record = func(*args, **kwargs)
387418
if record:
419+
bootstrap_servers = (
420+
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
421+
)
388422
with tracer.start_as_current_span(
389423
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
390424
):
@@ -395,6 +429,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
395429
record.partition(),
396430
record.offset(),
397431
operation=MessagingOperationTypeValues.PROCESS,
432+
bootstrap_servers=bootstrap_servers,
398433
)
399434
instance._current_context_token = context.attach(
400435
trace.set_span_in_context(instance._current_consume_span)
@@ -409,6 +444,9 @@ def wrap_consume(func, instance, tracer, args, kwargs):
409444

410445
records = func(*args, **kwargs)
411446
if len(records) > 0:
447+
bootstrap_servers = (
448+
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
449+
)
412450
with tracer.start_as_current_span(
413451
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
414452
):
@@ -417,6 +455,7 @@ def wrap_consume(func, instance, tracer, args, kwargs):
417455
instance._current_consume_span,
418456
records[0].topic(),
419457
operation=MessagingOperationTypeValues.PROCESS,
458+
bootstrap_servers=bootstrap_servers,
420459
)
421460
instance._current_context_token = context.attach(
422461
trace.set_span_in_context(instance._current_consume_span)

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
MESSAGING_SYSTEM,
1010
MessagingOperationTypeValues,
1111
)
12+
from opentelemetry.semconv.attributes.server_attributes import (
13+
SERVER_ADDRESS,
14+
SERVER_PORT,
15+
)
1216
from opentelemetry.semconv.trace import (
1317
MessagingDestinationKindValues,
1418
SpanAttributes,
@@ -21,7 +25,17 @@
2125
class KafkaPropertiesExtractor:
2226
@staticmethod
2327
def extract_bootstrap_servers(instance):
24-
return instance.config.get("bootstrap_servers")
28+
config = getattr(instance, "config", None)
29+
if not isinstance(config, dict):
30+
return None
31+
# confluent-kafka uses the dotted key "bootstrap.servers"; also accept
32+
# the python-style "bootstrap_servers" for robustness.
33+
servers = config.get("bootstrap.servers") or config.get(
34+
"bootstrap_servers"
35+
)
36+
if isinstance(servers, (list, tuple)):
37+
servers = ",".join(str(s) for s in servers)
38+
return servers
2539

2640
@staticmethod
2741
def _extract_argument(key, position, default_value, args, kwargs):
@@ -115,12 +129,37 @@ def _get_links_from_records(records):
115129
return links
116130

117131

132+
def _set_bootstrap_servers_attributes(span, bootstrap_servers):
133+
"""Populate messaging.url, server.address, server.port from a
134+
bootstrap.servers string (e.g. ``host1:9092,host2:9092``)."""
135+
if not bootstrap_servers:
136+
return
137+
138+
span.set_attribute(SpanAttributes.MESSAGING_URL, bootstrap_servers)
139+
140+
first_broker = bootstrap_servers.split(",")[0].strip()
141+
if not first_broker:
142+
return
143+
144+
if ":" in first_broker:
145+
host, _, port = first_broker.rpartition(":")
146+
span.set_attribute(SERVER_ADDRESS, host)
147+
try:
148+
span.set_attribute(SERVER_PORT, int(port))
149+
except ValueError:
150+
# Port wasn't numeric; skip rather than emit a bad attribute.
151+
_LOG.debug("non-numeric port in bootstrap.servers: %r", port)
152+
else:
153+
span.set_attribute(SERVER_ADDRESS, first_broker)
154+
155+
118156
def _enrich_span(
119157
span,
120158
topic,
121159
partition: Optional[int] = None,
122160
offset: Optional[int] = None,
123161
operation: Optional[MessagingOperationTypeValues] = None,
162+
bootstrap_servers: Optional[str] = None,
124163
):
125164
if not span.is_recording():
126165
return
@@ -141,6 +180,8 @@ def _enrich_span(
141180
else:
142181
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
143182

183+
_set_bootstrap_servers_attributes(span, bootstrap_servers)
184+
144185
# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
145186
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
146187
if partition is not None and offset is not None and topic:

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
MESSAGING_OPERATION,
3131
MESSAGING_SYSTEM,
3232
)
33+
from opentelemetry.semconv.attributes.server_attributes import (
34+
SERVER_ADDRESS,
35+
SERVER_PORT,
36+
)
3337
from opentelemetry.semconv.trace import (
3438
MessagingDestinationKindValues,
3539
SpanAttributes,
@@ -447,3 +451,53 @@ def test_producer_flush(self) -> None:
447451
span_list = self.memory_exporter.get_finished_spans()
448452
self._assert_span_count(span_list, 1)
449453
self._assert_topic(span_list[0], "topic-1")
454+
455+
def test_producer_sets_bootstrap_servers_attributes(self) -> None:
456+
instrumentation = ConfluentKafkaInstrumentor()
457+
producer = MockedProducer(
458+
[],
459+
{
460+
"bootstrap.servers": "broker-a:9092,broker-b:9093",
461+
},
462+
)
463+
464+
producer = instrumentation.instrument_producer(producer)
465+
producer.produce(topic="topic-1", key="k", value="v")
466+
467+
span = self.memory_exporter.get_finished_spans()[0]
468+
self.assertEqual(
469+
span.attributes[SpanAttributes.MESSAGING_URL],
470+
"broker-a:9092,broker-b:9093",
471+
)
472+
self.assertEqual(span.attributes[SERVER_ADDRESS], "broker-a")
473+
self.assertEqual(span.attributes[SERVER_PORT], 9092)
474+
475+
def test_consumer_sets_bootstrap_servers_attributes(self) -> None:
476+
instrumentation = ConfluentKafkaInstrumentor()
477+
consumer = MockConsumer(
478+
[MockedMessage("topic-1", 0, 0, [])],
479+
{
480+
"bootstrap.servers": "broker-1:9092",
481+
"group.id": "g",
482+
"auto.offset.reset": "earliest",
483+
},
484+
)
485+
486+
self.memory_exporter.clear()
487+
consumer = instrumentation.instrument_consumer(consumer)
488+
consumer.poll()
489+
# Second (empty) poll ends the in-flight `<topic> process` span so it
490+
# shows up in the exporter.
491+
consumer.poll()
492+
493+
process_span = next(
494+
s
495+
for s in self.memory_exporter.get_finished_spans()
496+
if s.name == "topic-1 process"
497+
)
498+
self.assertEqual(
499+
process_span.attributes[SpanAttributes.MESSAGING_URL],
500+
"broker-1:9092",
501+
)
502+
self.assertEqual(process_span.attributes[SERVER_ADDRESS], "broker-1")
503+
self.assertEqual(process_span.attributes[SERVER_PORT], 9092)

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
class MockConsumer(Consumer):
77
def __init__(self, queue, config):
88
self._queue = queue
9+
self.config = config
910
super().__init__(config)
1011

1112
def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
@@ -58,6 +59,7 @@ def value(self):
5859
class MockedProducer(Producer):
5960
def __init__(self, queue, config):
6061
self._queue = queue
62+
self.config = config
6163
super().__init__(config)
6264

6365
def produce(self, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg

0 commit comments

Comments
 (0)