Skip to content

Commit 17006d7

Browse files
authored
fix(confluent-kafka): populate bootstrap.servers span attributes (#4423)
* fix(confluent-kafka): populate bootstrap.servers span attributes KafkaPropertiesExtractor.extract_bootstrap_servers was defined but never called, so confluent-kafka spans were missing messaging.url, server.address, and server.port even though the extraction helper existed for it. - Capture the config dict in AutoInstrumentedProducer/Consumer and surface it on the Proxied{Producer,Consumer} wrappers. - extract_bootstrap_servers now accepts both the "bootstrap.servers" dotted key (confluent-kafka standard) and "bootstrap_servers", and safely handles list-valued configs and instances without a config attribute. - wrap_produce / wrap_poll / wrap_consume pull the config and pass it through _enrich_span, which sets messaging.url, server.address, and server.port (parsing host:port from the first broker). Closes #4104 Signed-off-by: alliasgher <alliasgher123@gmail.com> * fix(confluent-kafka): remove deprecated messaging.url, fix CHANGELOG link Signed-off-by: alliasgher <alliasgher123@gmail.com> * fix(confluent-kafka): use PUBLISH for second produce path per review The second _enrich_span call in wrap_produce also runs inside a producer span but used RECEIVE. Align with the first path and with the semantic conventions. Signed-off-by: Ali <alliasgher123@gmail.com> --------- Signed-off-by: alliasgher <alliasgher123@gmail.com> Signed-off-by: Ali <alliasgher123@gmail.com>
1 parent a3d38a7 commit 17006d7

5 files changed

Lines changed: 134 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5959
- `opentelemetry-instrumentation-mysqlclient`: Update unit tests to properly validate trace context trace flag values.
6060
([#4560](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4560))
6161

62+
### Fixed
63+
64+
- `opentelemetry-instrumentation-confluent-kafka`: Populate `server.address` and `server.port` span attributes from the producer/consumer `bootstrap.servers` config; previously `KafkaPropertiesExtractor.extract_bootstrap_servers` was defined but never called
65+
([#4423](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4423))
66+
6267
### Breaking changes
6368

6469
- Drop Python 3.9 support

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,27 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
116116
from .version import __version__
117117

118118

119+
def _capture_config(args, kwargs):
120+
"""Return the config dict that was passed to a Producer/Consumer
121+
constructor, regardless of whether it was supplied positionally, as
122+
``conf=`` kwarg, or (for Consumer) expanded as **kwargs."""
123+
if args and isinstance(args[0], dict):
124+
return args[0]
125+
conf = kwargs.get("conf")
126+
if isinstance(conf, dict):
127+
return conf
128+
# confluent_kafka.Consumer also supports Consumer(**conf) — in that case
129+
# the kwargs themselves are the config.
130+
if kwargs:
131+
return dict(kwargs)
132+
return None
133+
134+
119135
class AutoInstrumentedProducer(Producer):
136+
def __init__(self, *args, **kwargs):
137+
super().__init__(*args, **kwargs)
138+
self.config = _capture_config(args, kwargs)
139+
120140
# This method is deliberately implemented in order to allow wrapt to wrap this function
121141
def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation
122142
super().produce(topic, value, *args, **kwargs)
@@ -125,6 +145,7 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor
125145
class AutoInstrumentedConsumer(Consumer):
126146
def __init__(self, *args, **kwargs):
127147
super().__init__(*args, **kwargs)
148+
self.config = _capture_config(args, kwargs)
128149
self._current_consume_span = None
129150

130151
# This method is deliberately implemented in order to allow wrapt to wrap this function
@@ -144,6 +165,10 @@ class ProxiedProducer(Producer):
144165
def __init__(self, producer: Producer, tracer: Tracer):
145166
self._producer = producer
146167
self._tracer = tracer
168+
# Surface the wrapped producer's config (if any) so that
169+
# KafkaPropertiesExtractor.extract_bootstrap_servers can read it
170+
# through this proxy.
171+
self.config = getattr(producer, "config", None)
147172

148173
def flush(self, timeout=-1):
149174
return self._producer.flush(timeout)
@@ -173,6 +198,8 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
173198
self._tracer = tracer
174199
self._current_consume_span = None
175200
self._current_context_token = None
201+
# See ProxiedProducer.__init__ for rationale.
202+
self.config = getattr(consumer, "config", None)
176203

177204
def close(self, *args, **kwargs):
178205
return ConfluentKafkaInstrumentor.wrap_close(
@@ -355,11 +382,15 @@ def wrap_produce(func, instance, tracer, args, kwargs):
355382
topic = KafkaPropertiesExtractor.extract_produce_topic(
356383
args, kwargs
357384
)
385+
bootstrap_servers = (
386+
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
387+
)
358388
_enrich_span(
359389
span,
360390
topic,
361-
operation=MessagingOperationTypeValues.RECEIVE,
362-
) # Replace
391+
operation=MessagingOperationTypeValues.PUBLISH,
392+
bootstrap_servers=bootstrap_servers,
393+
) # Publish
363394
propagate.inject(
364395
headers,
365396
setter=_kafka_setter,
@@ -373,6 +404,9 @@ def wrap_poll(func, instance, tracer, args, kwargs):
373404

374405
record = func(*args, **kwargs)
375406
if record:
407+
bootstrap_servers = (
408+
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
409+
)
376410
with tracer.start_as_current_span(
377411
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
378412
):
@@ -383,6 +417,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
383417
record.partition(),
384418
record.offset(),
385419
operation=MessagingOperationTypeValues.PROCESS,
420+
bootstrap_servers=bootstrap_servers,
386421
)
387422
instance._current_context_token = context.attach(
388423
trace.set_span_in_context(instance._current_consume_span)
@@ -397,6 +432,9 @@ def wrap_consume(func, instance, tracer, args, kwargs):
397432

398433
records = func(*args, **kwargs)
399434
if len(records) > 0:
435+
bootstrap_servers = (
436+
KafkaPropertiesExtractor.extract_bootstrap_servers(instance)
437+
)
400438
with tracer.start_as_current_span(
401439
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
402440
):
@@ -405,6 +443,7 @@ def wrap_consume(func, instance, tracer, args, kwargs):
405443
instance._current_consume_span,
406444
records[0].topic(),
407445
operation=MessagingOperationTypeValues.PROCESS,
446+
bootstrap_servers=bootstrap_servers,
408447
)
409448
instance._current_context_token = context.attach(
410449
trace.set_span_in_context(instance._current_consume_span)

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
MESSAGING_SYSTEM,
1313
MessagingOperationTypeValues,
1414
)
15+
from opentelemetry.semconv.attributes.server_attributes import (
16+
SERVER_ADDRESS,
17+
SERVER_PORT,
18+
)
1519
from opentelemetry.semconv.trace import (
1620
MessagingDestinationKindValues,
1721
SpanAttributes,
@@ -24,7 +28,17 @@
2428
class KafkaPropertiesExtractor:
2529
@staticmethod
2630
def extract_bootstrap_servers(instance):
27-
return instance.config.get("bootstrap_servers")
31+
config = getattr(instance, "config", None)
32+
if not isinstance(config, dict):
33+
return None
34+
# confluent-kafka uses the dotted key "bootstrap.servers"; also accept
35+
# the python-style "bootstrap_servers" for robustness.
36+
servers = config.get("bootstrap.servers") or config.get(
37+
"bootstrap_servers"
38+
)
39+
if isinstance(servers, (list, tuple)):
40+
servers = ",".join(str(s) for s in servers)
41+
return servers
2842

2943
@staticmethod
3044
def _extract_argument(key, position, default_value, args, kwargs):
@@ -118,12 +132,35 @@ def _get_links_from_records(records):
118132
return links
119133

120134

135+
def _set_bootstrap_servers_attributes(span, bootstrap_servers):
136+
"""Populate server.address and server.port from a bootstrap.servers
137+
string (e.g. ``host1:9092,host2:9092``)."""
138+
if not bootstrap_servers:
139+
return
140+
141+
first_broker = bootstrap_servers.split(",")[0].strip()
142+
if not first_broker:
143+
return
144+
145+
if ":" in first_broker:
146+
host, _, port = first_broker.rpartition(":")
147+
span.set_attribute(SERVER_ADDRESS, host)
148+
try:
149+
span.set_attribute(SERVER_PORT, int(port))
150+
except ValueError:
151+
# Port wasn't numeric; skip rather than emit a bad attribute.
152+
_LOG.debug("non-numeric port in bootstrap.servers: %r", port)
153+
else:
154+
span.set_attribute(SERVER_ADDRESS, first_broker)
155+
156+
121157
def _enrich_span(
122158
span,
123159
topic,
124160
partition: Optional[int] = None,
125161
offset: Optional[int] = None,
126162
operation: Optional[MessagingOperationTypeValues] = None,
163+
bootstrap_servers: Optional[str] = None,
127164
):
128165
if not span.is_recording():
129166
return
@@ -144,6 +181,8 @@ def _enrich_span(
144181
else:
145182
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
146183

184+
_set_bootstrap_servers_attributes(span, bootstrap_servers)
185+
147186
# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
148187
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
149188
if partition is not None and offset is not None and topic:

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
MESSAGING_OPERATION,
2020
MESSAGING_SYSTEM,
2121
)
22+
from opentelemetry.semconv.attributes.server_attributes import (
23+
SERVER_ADDRESS,
24+
SERVER_PORT,
25+
)
2226
from opentelemetry.semconv.trace import (
2327
MessagingDestinationKindValues,
2428
SpanAttributes,
@@ -436,3 +440,45 @@ def test_producer_flush(self) -> None:
436440
span_list = self.memory_exporter.get_finished_spans()
437441
self._assert_span_count(span_list, 1)
438442
self._assert_topic(span_list[0], "topic-1")
443+
444+
def test_producer_sets_bootstrap_servers_attributes(self) -> None:
445+
instrumentation = ConfluentKafkaInstrumentor()
446+
producer = MockedProducer(
447+
[],
448+
{
449+
"bootstrap.servers": "broker-a:9092,broker-b:9093",
450+
},
451+
)
452+
453+
producer = instrumentation.instrument_producer(producer)
454+
producer.produce(topic="topic-1", key="k", value="v")
455+
456+
span = self.memory_exporter.get_finished_spans()[0]
457+
self.assertEqual(span.attributes[SERVER_ADDRESS], "broker-a")
458+
self.assertEqual(span.attributes[SERVER_PORT], 9092)
459+
460+
def test_consumer_sets_bootstrap_servers_attributes(self) -> None:
461+
instrumentation = ConfluentKafkaInstrumentor()
462+
consumer = MockConsumer(
463+
[MockedMessage("topic-1", 0, 0, [])],
464+
{
465+
"bootstrap.servers": "broker-1:9092",
466+
"group.id": "g",
467+
"auto.offset.reset": "earliest",
468+
},
469+
)
470+
471+
self.memory_exporter.clear()
472+
consumer = instrumentation.instrument_consumer(consumer)
473+
consumer.poll()
474+
# Second (empty) poll ends the in-flight `<topic> process` span so it
475+
# shows up in the exporter.
476+
consumer.poll()
477+
478+
process_span = next(
479+
s
480+
for s in self.memory_exporter.get_finished_spans()
481+
if s.name == "topic-1 process"
482+
)
483+
self.assertEqual(process_span.attributes[SERVER_ADDRESS], "broker-1")
484+
self.assertEqual(process_span.attributes[SERVER_PORT], 9092)

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
class MockConsumer(Consumer):
1010
def __init__(self, queue, config):
1111
self._queue = queue
12+
self.config = config
1213
super().__init__(config)
1314

1415
def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
@@ -61,6 +62,7 @@ def value(self):
6162
class MockedProducer(Producer):
6263
def __init__(self, queue, config):
6364
self._queue = queue
65+
self.config = config
6466
super().__init__(config)
6567

6668
def produce(self, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg

0 commit comments

Comments
 (0)