Describe your environment
opentelemetry-instrumentation-pika 0.52b1 (also present on main)
- pika
BlockingConnection / BlockingChannel
What happened?
When several consumers are registered on a single channel (one basic_consume
per queue), a single message delivery produces multiple nested CONSUMER spans
instead of one. The duplication count tracks registration order: the
first-registered consumer ends up with N spans (N = number of consumers on the
channel), the second with N-1, and so on.
Steps to Reproduce
Against a local RabbitMQ (guest/guest):
from collections import Counter
import pika
from opentelemetry import trace
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
exporter = InMemorySpanExporter()
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
trace.set_tracer_provider(provider)
PikaInstrumentor().instrument()
def on_msg(channel, method, properties, body):
channel.basic_ack(method.delivery_tag)
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
ch = conn.channel()
queues = ("q1", "q2", "q3")
for q in queues:
ch.queue_declare(q)
ch.basic_consume(q, on_message_callback=on_msg)
# one message to the FIRST-registered queue
ch.basic_publish(exchange="", routing_key="q1", body=b"hello")
conn.process_data_events(time_limit=1)
spans = exporter.get_finished_spans()
print(Counter(s.name for s in spans if s.kind.name == "CONSUMER"))
Expected Result
One CONSUMER span per delivery, regardless of how many consumers are registered
on the channel.
Actual Result
Observed (3 consumers on the channel):
Counter({'q1 receive': 3})
The single delivery to q1 emits 3 CONSUMER spans. Registering only q1
emits 1, as expected. The count equals the number of consumers registered on
the channel at or after the delivered one.
Additional context
Possible cause
The wrapper installed by _decorate_basic_consume re-runs
_instrument_channel_consumers(channel) after every basic_consume call, and
that method decorates every entry in channel._consumer_infos without
checking whether the callback is already decorated:
decorated_callback = utils._decorate_callback(consumer_callback, ...)
setattr(decorated_callback, "_original_callback", consumer_callback)
setattr(consumer_info, callback_attr, decorated_callback)
So each additional basic_consume re-wraps the callbacks registered before it,
stacking decorators on the earlier consumers. The uninstrument path is
asymmetric here — it does guard on _original_function before unwrapping.
Possible fix
Skip callbacks that are already decorated in _instrument_channel_consumers:
if getattr(consumer_callback, "_original_callback", None) is not None:
continue
Would you like to implement a fix?
None
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.
Describe your environment
opentelemetry-instrumentation-pika0.52b1 (also present onmain)BlockingConnection/BlockingChannelWhat happened?
When several consumers are registered on a single channel (one
basic_consumeper queue), a single message delivery produces multiple nested CONSUMER spans
instead of one. The duplication count tracks registration order: the
first-registered consumer ends up with N spans (N = number of consumers on the
channel), the second with N-1, and so on.
Steps to Reproduce
Against a local RabbitMQ (
guest/guest):Expected Result
One CONSUMER span per delivery, regardless of how many consumers are registered
on the channel.
Actual Result
Observed (3 consumers on the channel):
The single delivery to
q1emits 3 CONSUMER spans. Registering onlyq1emits 1, as expected. The count equals the number of consumers registered on
the channel at or after the delivered one.
Additional context
Possible cause
The wrapper installed by
_decorate_basic_consumere-runs_instrument_channel_consumers(channel)after everybasic_consumecall, andthat method decorates every entry in
channel._consumer_infoswithoutchecking whether the callback is already decorated:
So each additional
basic_consumere-wraps the callbacks registered before it,stacking decorators on the earlier consumers. The uninstrument path is
asymmetric here — it does guard on
_original_functionbefore unwrapping.Possible fix
Skip callbacks that are already decorated in
_instrument_channel_consumers:Would you like to implement a fix?
None
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding
+1orme too, to help us triage it. Learn more here.