diff --git a/CHANGELOG.md b/CHANGELOG.md index 564340ae26..3c9c67be4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-docker-tests`: Replace deprecated `SpanAttributes` from `opentelemetry.semconv.trace` with `opentelemetry.semconv._incubating.attributes` ([#4339](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4339)) +- `opentelemetry-instrumentation-confluent-kafka`: Skip `recv` span creation when `poll()` returns no message or `consume()` returns an empty list, avoiding empty spans on idle polls + ([#4349](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4349)) - Fix intermittent `Core Contrib Test` CI failures caused by GitHub git CDN SHA propagation lag by installing core packages from the already-checked-out local copy instead of a second git clone ([#4305](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4305)) - Don't import module in unwrap if not already imported diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index f4606bc4d9..ed390d7006 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -383,11 +383,11 @@ def wrap_poll(func, instance, tracer, args, kwargs): if instance._current_consume_span: _end_current_consume_span(instance) - with tracer.start_as_current_span( - "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER - ): - record = func(*args, **kwargs) - if record: + record = func(*args, **kwargs) + if record: + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): _create_new_consume_span(instance, tracer, [record]) _enrich_span( instance._current_consume_span, @@ -396,9 +396,9 @@ def wrap_poll(func, instance, tracer, args, kwargs): record.offset(), operation=MessagingOperationTypeValues.PROCESS, ) - instance._current_context_token = context.attach( - trace.set_span_in_context(instance._current_consume_span) - ) + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span) + ) return record @@ -407,21 +407,20 @@ def wrap_consume(func, instance, tracer, args, kwargs): if instance._current_consume_span: _end_current_consume_span(instance) - with tracer.start_as_current_span( - "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER - ): - records = func(*args, **kwargs) - if len(records) > 0: + records = func(*args, **kwargs) + if len(records) > 0: + with tracer.start_as_current_span( + "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER + ): _create_new_consume_span(instance, tracer, records) _enrich_span( instance._current_consume_span, records[0].topic(), operation=MessagingOperationTypeValues.PROCESS, ) - - instance._current_context_token = context.attach( - trace.set_span_in_context(instance._current_consume_span) - ) + instance._current_context_token = context.attach( + trace.set_span_in_context(instance._current_consume_span) + ) return records diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 365ac333d9..772ecd09ee 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -197,7 +197,6 @@ def test_poll(self) -> None: MESSAGING_MESSAGE_ID: "topic-30.1.3", }, }, - {"name": "recv", "attributes": {}}, ] consumer = MockConsumer( @@ -213,7 +212,7 @@ def test_poll(self) -> None: consumer.poll() consumer.poll() consumer.poll() - consumer.poll() + consumer.poll() # empty poll — must not produce a span span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) @@ -259,7 +258,6 @@ def test_consume(self) -> None: SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, }, }, - {"name": "recv", "attributes": {}}, ] consumer = MockConsumer( @@ -276,10 +274,86 @@ def test_consume(self) -> None: consumer.consume(3) consumer.consume(1) consumer.consume(2) - consumer.consume(1) + consumer.consume(1) # empty consume — must not produce a span span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) + def test_poll_empty_does_not_create_span(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + consumer = MockConsumer( + [], + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll() + consumer.poll() + + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 0) + + def test_consume_empty_does_not_create_span(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + consumer = MockConsumer( + [], + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.consume(5) + consumer.consume(5) + + span_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(span_list), 0) + + def test_poll_empty_cleans_up_previous_span_and_token(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + consumer = MockConsumer( + [MockedMessage("topic-1", 0, 0, [])], + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll() # non-empty: sets _current_consume_span and _current_context_token + self.assertIsNotNone(consumer._current_consume_span) + self.assertIsNotNone(consumer._current_context_token) + + consumer.poll() # empty: should clean up both + self.assertIsNone(consumer._current_consume_span) + self.assertIsNone(consumer._current_context_token) + + def test_consume_empty_cleans_up_previous_span_and_token(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + consumer = MockConsumer( + [MockedMessage("topic-1", 0, 0, [])], + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + consumer = instrumentation.instrument_consumer(consumer) + consumer.consume( + 1 + ) # non-empty: sets _current_consume_span and _current_context_token + self.assertIsNotNone(consumer._current_consume_span) + self.assertIsNotNone(consumer._current_context_token) + + consumer.consume(1) # empty: should clean up both + self.assertIsNone(consumer._current_consume_span) + self.assertIsNone(consumer._current_context_token) + def test_close(self) -> None: instrumentation = ConfluentKafkaInstrumentor() mocked_messages = [