Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `opentelemetry-docker-tests`: Replace deprecated `SpanAttributes` from `opentelemetry.semconv.trace` with `opentelemetry.semconv._incubating.attributes`
([#4339](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4339))
- `opentelemetry-instrumentation-confluent-kafka`: Skip `recv` span creation when `poll()` returns no message or `consume()` returns an empty list, avoiding empty spans on idle polls
([#4349](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4349))
- Fix intermittent `Core Contrib Test` CI failures caused by GitHub git CDN SHA propagation lag by installing core packages from the already-checked-out local copy instead of a second git clone
([#4305](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4305))
- Don't import module in unwrap if not already imported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,11 @@ def wrap_poll(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
_end_current_consume_span(instance)

with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
record = func(*args, **kwargs)
if record:
record = func(*args, **kwargs)
Comment thread
sterchelen marked this conversation as resolved.
if record:
with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
_create_new_consume_span(instance, tracer, [record])
_enrich_span(
instance._current_consume_span,
Expand All @@ -396,9 +396,9 @@ def wrap_poll(func, instance, tracer, args, kwargs):
record.offset(),
operation=MessagingOperationTypeValues.PROCESS,
)
instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
)
instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
)

return record

Expand All @@ -407,21 +407,20 @@ def wrap_consume(func, instance, tracer, args, kwargs):
if instance._current_consume_span:
_end_current_consume_span(instance)

with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
records = func(*args, **kwargs)
if len(records) > 0:
records = func(*args, **kwargs)
if len(records) > 0:
with tracer.start_as_current_span(
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
):
_create_new_consume_span(instance, tracer, records)
_enrich_span(
instance._current_consume_span,
records[0].topic(),
operation=MessagingOperationTypeValues.PROCESS,
)

instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
)
instance._current_context_token = context.attach(
trace.set_span_in_context(instance._current_consume_span)
)

return records

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ def test_poll(self) -> None:
MESSAGING_MESSAGE_ID: "topic-30.1.3",
},
},
{"name": "recv", "attributes": {}},
]

consumer = MockConsumer(
Expand All @@ -213,7 +212,7 @@ def test_poll(self) -> None:
consumer.poll()
consumer.poll()
consumer.poll()
consumer.poll()
consumer.poll() # empty poll — must not produce a span

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)
Expand Down Expand Up @@ -259,7 +258,6 @@ def test_consume(self) -> None:
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
},
},
{"name": "recv", "attributes": {}},
]

consumer = MockConsumer(
Expand All @@ -276,10 +274,86 @@ def test_consume(self) -> None:
consumer.consume(3)
consumer.consume(1)
consumer.consume(2)
consumer.consume(1)
consumer.consume(1) # empty consume — must not produce a span
span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def test_poll_empty_does_not_create_span(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
consumer = MockConsumer(
[],
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)
self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll()
consumer.poll()

span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 0)

def test_consume_empty_does_not_create_span(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
consumer = MockConsumer(
[],
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)
self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.consume(5)
consumer.consume(5)

span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 0)

def test_poll_empty_cleans_up_previous_span_and_token(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
consumer = MockConsumer(
[MockedMessage("topic-1", 0, 0, [])],
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll() # non-empty: sets _current_consume_span and _current_context_token
self.assertIsNotNone(consumer._current_consume_span)
self.assertIsNotNone(consumer._current_context_token)

consumer.poll() # empty: should clean up both
self.assertIsNone(consumer._current_consume_span)
self.assertIsNone(consumer._current_context_token)

def test_consume_empty_cleans_up_previous_span_and_token(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
consumer = MockConsumer(
[MockedMessage("topic-1", 0, 0, [])],
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)
consumer = instrumentation.instrument_consumer(consumer)
consumer.consume(
1
) # non-empty: sets _current_consume_span and _current_context_token
self.assertIsNotNone(consumer._current_consume_span)
self.assertIsNotNone(consumer._current_context_token)

consumer.consume(1) # empty: should clean up both
self.assertIsNone(consumer._current_consume_span)
self.assertIsNone(consumer._current_context_token)

def test_close(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
Expand Down
Loading