Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/4537.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry-instrumentation-celery`: add `use_span_links` option to link task execution spans to producer spans instead of creating parent-child relationships
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ def add(x, y):

add.delay(42, 50)

Configuration
-------------

The ``CeleryInstrumentor().instrument()`` method accepts the following arguments:

* ``use_span_links`` (bool): When ``True``, Celery task execution spans will be linked to the
task creation spans instead of being created as child spans. This provides a looser
coupling between spans in distributed systems. Defaults to ``False`` to maintain
backward compatibility.

Setting up tracing
------------------

Expand Down Expand Up @@ -126,13 +136,15 @@ def instrumentation_dependencies(self) -> Collection[str]:

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
use_span_links = kwargs.get("use_span_links", False)

self._tracer = trace.get_tracer(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
self._use_span_links = use_span_links

meter_provider = kwargs.get("meter_provider")
meter = get_meter(
Expand Down Expand Up @@ -175,14 +187,32 @@ def _trace_prerun(self, *args, **kwargs):
self.update_task_duration_time(task_id)
request = task.request
tracectx = extract(request, getter=celery_getter) or None
token = context_api.attach(tracectx) if tracectx is not None else None

logger.debug("prerun signal start task_id=%s", task_id)

operation_name = f"{_TASK_RUN}/{task.name}"
span = self._tracer.start_span(
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
)

if self._use_span_links and tracectx is not None:
parent_span_context = trace.get_current_span(
tracectx
).get_span_context()
links = (
[trace.Link(parent_span_context)]
if parent_span_context.is_valid
else None
)
span = self._tracer.start_span(
operation_name, links=links, kind=trace.SpanKind.CONSUMER
)
# Don't attach the context when using links to avoid parent-child relationship
token = None
else:
token = (
context_api.attach(tracectx) if tracectx is not None else None
)
span = self._tracer.start_span(
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
)

activation = trace.use_span(span, end_on_exit=True)
activation.__enter__() # pylint: disable=unnecessary-dunder-call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,63 @@ def _retrieve_context_wrapper_none_token(

unwrap(utils, "retrieve_context")

def test_task_use_span_links(self):
CeleryInstrumentor().instrument(use_span_links=True)

result = task_add.delay(1, 2)

timeout = time.time() + 60 * 1 # 1 minute from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)

spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)

consumer, producer = spans

self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add")
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
self.assertSpanHasAttributes(
consumer,
{
"celery.action": "run",
"celery.state": "SUCCESS",
SpanAttributes.MESSAGING_DESTINATION: "celery",
"celery.task_name": "tests.celery_test_tasks.task_add",
},
)

self.assertEqual(consumer.status.status_code, StatusCode.UNSET)
self.assertEqual(0, len(consumer.events))

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_add"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertSpanHasAttributes(
producer,
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_add",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
},
)

# Verify that consumer span is not a child of producer span when using links
self.assertIsNone(consumer.parent)
self.assertNotEqual(
consumer.context.trace_id, producer.context.trace_id
)

# Verify that consumer span has a link to the producer span
self.assertEqual(len(consumer.links), 1)
link = consumer.links[0]
self.assertEqual(link.context.span_id, producer.context.span_id)
self.assertEqual(link.context.trace_id, producer.context.trace_id)


class TestCelerySignatureTask(TestBase):
def setUp(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,36 @@ def fn_task():

span = spans_list[0]
assert span.resource == resource


def test_use_span_links(celery_app, tracer_provider, memory_exporter):
@celery_app.task
def fn_task():
return 42

CeleryInstrumentor().uninstrument()
CeleryInstrumentor().instrument(
tracer_provider=tracer_provider, use_span_links=True
)

result = fn_task.apply_async()
assert result.get(timeout=ASYNC_GET_TIMEOUT) == 42

spans = memory_exporter.get_finished_spans()
assert len(spans) == 2

run_span = next(
s for s in spans if s.attributes.get("celery.action") == "run"
)
async_span = next(
s for s in spans if s.attributes.get("celery.action") == "apply_async"
)

# The run span should not be a child of the async span when using links.
assert run_span.parent is None
assert run_span.context.trace_id != async_span.context.trace_id

assert len(run_span.links) == 1
link = run_span.links[0]
assert link.context.span_id == async_span.context.span_id
assert link.context.trace_id == async_span.context.trace_id
Loading