Skip to content

pika-instrumentation; Multiple basic_consume calls on one channel produce duplicate (nested) CONSUMER spans #4667

@bjornars

Description

@bjornars

Describe your environment

  • opentelemetry-instrumentation-pika 0.52b1 (also present on main)
  • pika BlockingConnection / BlockingChannel

What happened?

When several consumers are registered on a single channel (one basic_consume
per queue), a single message delivery produces multiple nested CONSUMER spans
instead of one. The duplication count tracks registration order: the
first-registered consumer ends up with N spans (N = number of consumers on the
channel), the second with N-1, and so on.

Steps to Reproduce

Against a local RabbitMQ (guest/guest):

from collections import Counter

import pika
from opentelemetry import trace
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

exporter = InMemorySpanExporter()
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
trace.set_tracer_provider(provider)
PikaInstrumentor().instrument()


def on_msg(channel, method, properties, body):
    channel.basic_ack(method.delivery_tag)


conn = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
ch = conn.channel()
queues = ("q1", "q2", "q3")
for q in queues:
    ch.queue_declare(q)
    ch.basic_consume(q, on_message_callback=on_msg)

# one message to the FIRST-registered queue
ch.basic_publish(exchange="", routing_key="q1", body=b"hello")
conn.process_data_events(time_limit=1)

spans = exporter.get_finished_spans()
print(Counter(s.name for s in spans if s.kind.name == "CONSUMER"))

Expected Result

One CONSUMER span per delivery, regardless of how many consumers are registered
on the channel.

Actual Result

Observed (3 consumers on the channel):

Counter({'q1 receive': 3})

The single delivery to q1 emits 3 CONSUMER spans. Registering only q1
emits 1, as expected. The count equals the number of consumers registered on
the channel at or after the delivered one.

Additional context

Possible cause

The wrapper installed by _decorate_basic_consume re-runs
_instrument_channel_consumers(channel) after every basic_consume call, and
that method decorates every entry in channel._consumer_infos without
checking whether the callback is already decorated:

decorated_callback = utils._decorate_callback(consumer_callback, ...)
setattr(decorated_callback, "_original_callback", consumer_callback)
setattr(consumer_info, callback_attr, decorated_callback)

So each additional basic_consume re-wraps the callbacks registered before it,
stacking decorators on the earlier consumers. The uninstrument path is
asymmetric here — it does guard on _original_function before unwrapping.

Possible fix

Skip callbacks that are already decorated in _instrument_channel_consumers:

if getattr(consumer_callback, "_original_callback", None) is not None:
    continue

Would you like to implement a fix?

None

Tip

React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions