Skip to content

Commit 033fabe

Browse files
committed
celery: allow using links instead of child spans for task execution
1 parent b5a773f commit 033fabe

3 files changed

Lines changed: 94 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919
([#4335](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4335))
2020
- Expand `AGENTS.md` with instrumentation/GenAI guidance and add PR review instructions.
2121
([#4457](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4457))
22+
- `opentelemetry-instrumentation-celery`: Add `use_span_links` parameter to allow creating span links instead of parent-child relationships between task creation and execution spans.
23+
([#3002](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3002))
2224

2325
### Fixed
2426

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ def add(x, y):
4747
4848
add.delay(42, 50)
4949
50+
Configuration
51+
-------------
52+
53+
The ``CeleryInstrumentor().instrument()`` method accepts the following arguments:
54+
55+
* ``use_span_links`` (bool): When ``True``, Celery task execution spans will be linked to the
56+
task creation spans instead of being created as child spans. This provides a looser
57+
coupling between spans in distributed systems. Defaults to ``False`` to maintain
58+
backward compatibility.
59+
5060
Setting up tracing
5161
------------------
5262
@@ -134,6 +144,7 @@ def instrumentation_dependencies(self) -> Collection[str]:
134144

135145
def _instrument(self, **kwargs):
136146
tracer_provider = kwargs.get("tracer_provider")
147+
use_span_links = kwargs.get("use_span_links", False)
137148

138149
# pylint: disable=attribute-defined-outside-init
139150
self._tracer = trace.get_tracer(
@@ -142,6 +153,8 @@ def _instrument(self, **kwargs):
142153
tracer_provider,
143154
schema_url="https://opentelemetry.io/schemas/1.11.0",
144155
)
156+
# pylint: disable=attribute-defined-outside-init
157+
self._use_span_links = use_span_links
145158

146159
meter_provider = kwargs.get("meter_provider")
147160
meter = get_meter(
@@ -182,14 +195,32 @@ def _trace_prerun(self, *args, **kwargs):
182195
self.update_task_duration_time(task_id)
183196
request = task.request
184197
tracectx = extract(request, getter=celery_getter) or None
185-
token = context_api.attach(tracectx) if tracectx is not None else None
186198

187199
logger.debug("prerun signal start task_id=%s", task_id)
188200

189201
operation_name = f"{_TASK_RUN}/{task.name}"
190-
span = self._tracer.start_span(
191-
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
192-
)
202+
203+
if self._use_span_links and tracectx is not None:
204+
parent_span_context = trace.get_current_span(
205+
tracectx
206+
).get_span_context()
207+
links = (
208+
[trace.Link(parent_span_context)]
209+
if parent_span_context.is_valid
210+
else None
211+
)
212+
span = self._tracer.start_span(
213+
operation_name, links=links, kind=trace.SpanKind.CONSUMER
214+
)
215+
# Don't attach the context when using links to avoid parent-child relationship
216+
token = None
217+
else:
218+
token = (
219+
context_api.attach(tracectx) if tracectx is not None else None
220+
)
221+
span = self._tracer.start_span(
222+
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
223+
)
193224

194225
activation = trace.use_span(span, end_on_exit=True)
195226
activation.__enter__() # pylint: disable=unnecessary-dunder-call

instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,63 @@ def _retrieve_context_wrapper_none_token(
233233

234234
unwrap(utils, "retrieve_context")
235235

236+
def test_task_use_span_links(self):
237+
CeleryInstrumentor().instrument(use_span_links=True)
238+
239+
result = task_add.delay(1, 2)
240+
241+
timeout = time.time() + 60 * 1 # 1 minute from now
242+
while not result.ready():
243+
if time.time() > timeout:
244+
break
245+
time.sleep(0.05)
246+
247+
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
248+
self.assertEqual(len(spans), 2)
249+
250+
consumer, producer = spans
251+
252+
self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add")
253+
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
254+
self.assertSpanHasAttributes(
255+
consumer,
256+
{
257+
"celery.action": "run",
258+
"celery.state": "SUCCESS",
259+
SpanAttributes.MESSAGING_DESTINATION: "celery",
260+
"celery.task_name": "tests.celery_test_tasks.task_add",
261+
},
262+
)
263+
264+
self.assertEqual(consumer.status.status_code, StatusCode.UNSET)
265+
self.assertEqual(0, len(consumer.events))
266+
267+
self.assertEqual(
268+
producer.name, "apply_async/tests.celery_test_tasks.task_add"
269+
)
270+
self.assertEqual(producer.kind, SpanKind.PRODUCER)
271+
self.assertSpanHasAttributes(
272+
producer,
273+
{
274+
"celery.action": "apply_async",
275+
"celery.task_name": "tests.celery_test_tasks.task_add",
276+
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
277+
SpanAttributes.MESSAGING_DESTINATION: "celery",
278+
},
279+
)
280+
281+
# Verify that consumer span is not a child of producer span when using links
282+
self.assertIsNone(consumer.parent)
283+
self.assertNotEqual(
284+
consumer.context.trace_id, producer.context.trace_id
285+
)
286+
287+
# Verify that consumer span has a link to the producer span
288+
self.assertEqual(len(consumer.links), 1)
289+
link = consumer.links[0]
290+
self.assertEqual(link.context.span_id, producer.context.span_id)
291+
self.assertEqual(link.context.trace_id, producer.context.trace_id)
292+
236293

237294
class TestCelerySignatureTask(TestBase):
238295
def setUp(self):

0 commit comments

Comments
 (0)