diff --git a/CHANGELOG.md b/CHANGELOG.md index 64da93791c..c13f40380c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-flask`: Clean up environ keys in `_teardown_request` to prevent duplicate execution ([#4341](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4341)) +### Fixed + +- `opentelemetry-instrumentation-confluent-kafka`: Populate `server.address` and `server.port` span attributes from the producer/consumer `bootstrap.servers` config; previously `KafkaPropertiesExtractor.extract_bootstrap_servers` was defined but never called + ([#4423](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4423)) + ### Breaking changes - Drop Python 3.9 support diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index ed390d7006..1c47b836cd 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -127,7 +127,27 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None) from .version import __version__ +def _capture_config(args, kwargs): + """Return the config dict that was passed to a Producer/Consumer + constructor, regardless of whether it was supplied positionally, as + ``conf=`` kwarg, or (for Consumer) expanded as **kwargs.""" + if args and isinstance(args[0], dict): + return args[0] + conf = kwargs.get("conf") + if isinstance(conf, dict): + return conf + # confluent_kafka.Consumer also supports Consumer(**conf) — in that case + # the kwargs themselves are the config. + if kwargs: + return dict(kwargs) + return None + + class AutoInstrumentedProducer(Producer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.config = _capture_config(args, kwargs) + # This method is deliberately implemented in order to allow wrapt to wrap this function def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg,useless-super-delegation super().produce(topic, value, *args, **kwargs) @@ -136,6 +156,7 @@ def produce(self, topic, value=None, *args, **kwargs): # pylint: disable=keywor class AutoInstrumentedConsumer(Consumer): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.config = _capture_config(args, kwargs) self._current_consume_span = None # This method is deliberately implemented in order to allow wrapt to wrap this function @@ -155,6 +176,10 @@ class ProxiedProducer(Producer): def __init__(self, producer: Producer, tracer: Tracer): self._producer = producer self._tracer = tracer + # Surface the wrapped producer's config (if any) so that + # KafkaPropertiesExtractor.extract_bootstrap_servers can read it + # through this proxy. + self.config = getattr(producer, "config", None) def flush(self, timeout=-1): return self._producer.flush(timeout) @@ -184,6 +209,8 @@ def __init__(self, consumer: Consumer, tracer: Tracer): self._tracer = tracer self._current_consume_span = None self._current_context_token = None + # See ProxiedProducer.__init__ for rationale. + self.config = getattr(consumer, "config", None) def close(self, *args, **kwargs): return ConfluentKafkaInstrumentor.wrap_close( @@ -367,11 +394,15 @@ def wrap_produce(func, instance, tracer, args, kwargs): topic = KafkaPropertiesExtractor.extract_produce_topic( args, kwargs ) + bootstrap_servers = ( + KafkaPropertiesExtractor.extract_bootstrap_servers(instance) + ) _enrich_span( span, topic, - operation=MessagingOperationTypeValues.RECEIVE, - ) # Replace + operation=MessagingOperationTypeValues.PUBLISH, + bootstrap_servers=bootstrap_servers, + ) # Publish propagate.inject( headers, setter=_kafka_setter, @@ -385,6 +416,9 @@ def wrap_poll(func, instance, tracer, args, kwargs): record = func(*args, **kwargs) if record: + bootstrap_servers = ( + KafkaPropertiesExtractor.extract_bootstrap_servers(instance) + ) with tracer.start_as_current_span( "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER ): @@ -395,6 +429,7 @@ def wrap_poll(func, instance, tracer, args, kwargs): record.partition(), record.offset(), operation=MessagingOperationTypeValues.PROCESS, + bootstrap_servers=bootstrap_servers, ) instance._current_context_token = context.attach( trace.set_span_in_context(instance._current_consume_span) @@ -409,6 +444,9 @@ def wrap_consume(func, instance, tracer, args, kwargs): records = func(*args, **kwargs) if len(records) > 0: + bootstrap_servers = ( + KafkaPropertiesExtractor.extract_bootstrap_servers(instance) + ) with tracer.start_as_current_span( "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER ): @@ -417,6 +455,7 @@ def wrap_consume(func, instance, tracer, args, kwargs): instance._current_consume_span, records[0].topic(), operation=MessagingOperationTypeValues.PROCESS, + bootstrap_servers=bootstrap_servers, ) instance._current_context_token = context.attach( trace.set_span_in_context(instance._current_consume_span) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py index f7b5c059bb..5b5fa99318 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py @@ -9,6 +9,10 @@ MESSAGING_SYSTEM, MessagingOperationTypeValues, ) +from opentelemetry.semconv.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, SpanAttributes, @@ -21,7 +25,17 @@ class KafkaPropertiesExtractor: @staticmethod def extract_bootstrap_servers(instance): - return instance.config.get("bootstrap_servers") + config = getattr(instance, "config", None) + if not isinstance(config, dict): + return None + # confluent-kafka uses the dotted key "bootstrap.servers"; also accept + # the python-style "bootstrap_servers" for robustness. + servers = config.get("bootstrap.servers") or config.get( + "bootstrap_servers" + ) + if isinstance(servers, (list, tuple)): + servers = ",".join(str(s) for s in servers) + return servers @staticmethod def _extract_argument(key, position, default_value, args, kwargs): @@ -115,12 +129,35 @@ def _get_links_from_records(records): return links +def _set_bootstrap_servers_attributes(span, bootstrap_servers): + """Populate server.address and server.port from a bootstrap.servers + string (e.g. ``host1:9092,host2:9092``).""" + if not bootstrap_servers: + return + + first_broker = bootstrap_servers.split(",")[0].strip() + if not first_broker: + return + + if ":" in first_broker: + host, _, port = first_broker.rpartition(":") + span.set_attribute(SERVER_ADDRESS, host) + try: + span.set_attribute(SERVER_PORT, int(port)) + except ValueError: + # Port wasn't numeric; skip rather than emit a bad attribute. + _LOG.debug("non-numeric port in bootstrap.servers: %r", port) + else: + span.set_attribute(SERVER_ADDRESS, first_broker) + + def _enrich_span( span, topic, partition: Optional[int] = None, offset: Optional[int] = None, operation: Optional[MessagingOperationTypeValues] = None, + bootstrap_servers: Optional[str] = None, ): if not span.is_recording(): return @@ -141,6 +178,8 @@ def _enrich_span( else: span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + _set_bootstrap_servers_attributes(span, bootstrap_servers) + # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic # A message within Kafka is uniquely defined by its topic name, topic partition and offset. if partition is not None and offset is not None and topic: diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 772ecd09ee..6ca603ba3e 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -30,6 +30,10 @@ MESSAGING_OPERATION, MESSAGING_SYSTEM, ) +from opentelemetry.semconv.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) from opentelemetry.semconv.trace import ( MessagingDestinationKindValues, SpanAttributes, @@ -447,3 +451,45 @@ def test_producer_flush(self) -> None: span_list = self.memory_exporter.get_finished_spans() self._assert_span_count(span_list, 1) self._assert_topic(span_list[0], "topic-1") + + def test_producer_sets_bootstrap_servers_attributes(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + producer = MockedProducer( + [], + { + "bootstrap.servers": "broker-a:9092,broker-b:9093", + }, + ) + + producer = instrumentation.instrument_producer(producer) + producer.produce(topic="topic-1", key="k", value="v") + + span = self.memory_exporter.get_finished_spans()[0] + self.assertEqual(span.attributes[SERVER_ADDRESS], "broker-a") + self.assertEqual(span.attributes[SERVER_PORT], 9092) + + def test_consumer_sets_bootstrap_servers_attributes(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + consumer = MockConsumer( + [MockedMessage("topic-1", 0, 0, [])], + { + "bootstrap.servers": "broker-1:9092", + "group.id": "g", + "auto.offset.reset": "earliest", + }, + ) + + self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll() + # Second (empty) poll ends the in-flight ` process` span so it + # shows up in the exporter. + consumer.poll() + + process_span = next( + s + for s in self.memory_exporter.get_finished_spans() + if s.name == "topic-1 process" + ) + self.assertEqual(process_span.attributes[SERVER_ADDRESS], "broker-1") + self.assertEqual(process_span.attributes[SERVER_PORT], 9092) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index f87dbd6576..0d359ebaa7 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -6,6 +6,7 @@ class MockConsumer(Consumer): def __init__(self, queue, config): self._queue = queue + self.config = config super().__init__(config) def consume(self, num_messages=1, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg @@ -58,6 +59,7 @@ def value(self): class MockedProducer(Producer): def __init__(self, queue, config): self._queue = queue + self.config = config super().__init__(config) def produce(self, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg