diff --git a/.changelog/4537.added b/.changelog/4537.added new file mode 100644 index 0000000000..cabb98b871 --- /dev/null +++ b/.changelog/4537.added @@ -0,0 +1 @@ +`opentelemetry-instrumentation-celery`: add `use_span_links` option to link task execution spans to producer spans instead of creating parent-child relationships diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index e6b1d37fbf..a8db0fa3eb 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -36,6 +36,16 @@ def add(x, y): add.delay(42, 50) +Configuration +------------- + +The ``CeleryInstrumentor().instrument()`` method accepts the following arguments: + +* ``use_span_links`` (bool): When ``True``, Celery task execution spans will be linked to the + task creation spans instead of being created as child spans. This provides a looser + coupling between spans in distributed systems. Defaults to ``False`` to maintain + backward compatibility. + Setting up tracing ------------------ @@ -126,6 +136,7 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider") + use_span_links = kwargs.get("use_span_links", False) self._tracer = trace.get_tracer( __name__, @@ -133,6 +144,7 @@ def _instrument(self, **kwargs): tracer_provider, schema_url="https://opentelemetry.io/schemas/1.11.0", ) + self._use_span_links = use_span_links meter_provider = kwargs.get("meter_provider") meter = get_meter( @@ -175,14 +187,32 @@ def _trace_prerun(self, *args, **kwargs): self.update_task_duration_time(task_id) request = task.request tracectx = extract(request, getter=celery_getter) or None - token = context_api.attach(tracectx) if tracectx is not None else None logger.debug("prerun signal start task_id=%s", task_id) operation_name = f"{_TASK_RUN}/{task.name}" - span = self._tracer.start_span( - operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER - ) + + if self._use_span_links and tracectx is not None: + parent_span_context = trace.get_current_span( + tracectx + ).get_span_context() + links = ( + [trace.Link(parent_span_context)] + if parent_span_context.is_valid + else None + ) + span = self._tracer.start_span( + operation_name, links=links, kind=trace.SpanKind.CONSUMER + ) + # Don't attach the context when using links to avoid parent-child relationship + token = None + else: + token = ( + context_api.attach(tracectx) if tracectx is not None else None + ) + span = self._tracer.start_span( + operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER + ) activation = trace.use_span(span, end_on_exit=True) activation.__enter__() # pylint: disable=unnecessary-dunder-call diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 42bc5df09a..79d35e8785 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -240,6 +240,63 @@ def _retrieve_context_wrapper_none_token( unwrap(utils, "retrieve_context") + def test_task_use_span_links(self): + CeleryInstrumentor().instrument(use_span_links=True) + + result = task_add.delay(1, 2) + + timeout = time.time() + 60 * 1 # 1 minute from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add") + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "SUCCESS", + SpanAttributes.MESSAGING_DESTINATION: "celery", + "celery.task_name": "tests.celery_test_tasks.task_add", + }, + ) + + self.assertEqual(consumer.status.status_code, StatusCode.UNSET) + self.assertEqual(0, len(consumer.events)) + + self.assertEqual( + producer.name, "apply_async/tests.celery_test_tasks.task_add" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + self.assertSpanHasAttributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_add", + SpanAttributes.MESSAGING_DESTINATION_KIND: "queue", + SpanAttributes.MESSAGING_DESTINATION: "celery", + }, + ) + + # Verify that consumer span is not a child of producer span when using links + self.assertIsNone(consumer.parent) + self.assertNotEqual( + consumer.context.trace_id, producer.context.trace_id + ) + + # Verify that consumer span has a link to the producer span + self.assertEqual(len(consumer.links), 1) + link = consumer.links[0] + self.assertEqual(link.context.span_id, producer.context.span_id) + self.assertEqual(link.context.trace_id, producer.context.trace_id) + class TestCelerySignatureTask(TestBase): def setUp(self): diff --git a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py index fdb63e3080..0312a95af8 100644 --- a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py +++ b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py @@ -535,3 +535,36 @@ def fn_task(): span = spans_list[0] assert span.resource == resource + + +def test_use_span_links(celery_app, tracer_provider, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + CeleryInstrumentor().uninstrument() + CeleryInstrumentor().instrument( + tracer_provider=tracer_provider, use_span_links=True + ) + + result = fn_task.apply_async() + assert result.get(timeout=ASYNC_GET_TIMEOUT) == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + run_span = next( + s for s in spans if s.attributes.get("celery.action") == "run" + ) + async_span = next( + s for s in spans if s.attributes.get("celery.action") == "apply_async" + ) + + # The run span should not be a child of the async span when using links. + assert run_span.parent is None + assert run_span.context.trace_id != async_span.context.trace_id + + assert len(run_span.links) == 1 + link = run_span.links[0] + assert link.context.span_id == async_span.context.span_id + assert link.context.trace_id == async_span.context.trace_id