Skip to content

Commit 4262a4a

Browse files
committed
celery: allow using links instead of child spans for task execution
1 parent d2f396d commit 4262a4a

4 files changed

Lines changed: 125 additions & 4 deletions

File tree

.changelog/4537.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
`opentelemetry-instrumentation-celery`: add `use_span_links` option to link task execution spans to producer spans instead of creating parent-child relationships

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,16 @@ def add(x, y):
3636
3737
add.delay(42, 50)
3838
39+
Configuration
40+
-------------
41+
42+
The ``CeleryInstrumentor().instrument()`` method accepts the following arguments:
43+
44+
* ``use_span_links`` (bool): When ``True``, Celery task execution spans will be linked to the
45+
task creation spans instead of being created as child spans. This provides a looser
46+
coupling between spans in distributed systems. Defaults to ``False`` to maintain
47+
backward compatibility.
48+
3949
Setting up tracing
4050
------------------
4151
@@ -127,13 +137,15 @@ def instrumentation_dependencies(self) -> Collection[str]:
127137

128138
def _instrument(self, **kwargs):
129139
tracer_provider = kwargs.get("tracer_provider")
140+
use_span_links = kwargs.get("use_span_links", False)
130141

131142
self._tracer = trace.get_tracer(
132143
__name__,
133144
__version__,
134145
tracer_provider,
135146
schema_url="https://opentelemetry.io/schemas/1.11.0",
136147
)
148+
self._use_span_links = use_span_links
137149

138150
meter_provider = kwargs.get("meter_provider")
139151
meter = get_meter(
@@ -176,14 +188,32 @@ def _trace_prerun(self, *args, **kwargs):
176188
self.update_task_duration_time(task_id)
177189
request = task.request
178190
tracectx = extract(request, getter=celery_getter) or None
179-
token = context_api.attach(tracectx) if tracectx is not None else None
180191

181192
logger.debug("prerun signal start task_id=%s", task_id)
182193

183194
operation_name = f"{_TASK_RUN}/{task.name}"
184-
span = self._tracer.start_span(
185-
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
186-
)
195+
196+
if self._use_span_links and tracectx is not None:
197+
parent_span_context = trace.get_current_span(
198+
tracectx
199+
).get_span_context()
200+
links = (
201+
[trace.Link(parent_span_context)]
202+
if parent_span_context.is_valid
203+
else None
204+
)
205+
span = self._tracer.start_span(
206+
operation_name, links=links, kind=trace.SpanKind.CONSUMER
207+
)
208+
# Don't attach the context when using links to avoid parent-child relationship
209+
token = None
210+
else:
211+
token = (
212+
context_api.attach(tracectx) if tracectx is not None else None
213+
)
214+
span = self._tracer.start_span(
215+
operation_name, context=tracectx, kind=trace.SpanKind.CONSUMER
216+
)
187217

188218
activation = trace.use_span(span, end_on_exit=True)
189219
activation.__enter__() # pylint: disable=unnecessary-dunder-call

instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,63 @@ def _retrieve_context_wrapper_none_token(
240240

241241
unwrap(utils, "retrieve_context")
242242

243+
def test_task_use_span_links(self):
244+
CeleryInstrumentor().instrument(use_span_links=True)
245+
246+
result = task_add.delay(1, 2)
247+
248+
timeout = time.time() + 60 * 1 # 1 minute from now
249+
while not result.ready():
250+
if time.time() > timeout:
251+
break
252+
time.sleep(0.05)
253+
254+
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
255+
self.assertEqual(len(spans), 2)
256+
257+
consumer, producer = spans
258+
259+
self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add")
260+
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
261+
self.assertSpanHasAttributes(
262+
consumer,
263+
{
264+
"celery.action": "run",
265+
"celery.state": "SUCCESS",
266+
SpanAttributes.MESSAGING_DESTINATION: "celery",
267+
"celery.task_name": "tests.celery_test_tasks.task_add",
268+
},
269+
)
270+
271+
self.assertEqual(consumer.status.status_code, StatusCode.UNSET)
272+
self.assertEqual(0, len(consumer.events))
273+
274+
self.assertEqual(
275+
producer.name, "apply_async/tests.celery_test_tasks.task_add"
276+
)
277+
self.assertEqual(producer.kind, SpanKind.PRODUCER)
278+
self.assertSpanHasAttributes(
279+
producer,
280+
{
281+
"celery.action": "apply_async",
282+
"celery.task_name": "tests.celery_test_tasks.task_add",
283+
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
284+
SpanAttributes.MESSAGING_DESTINATION: "celery",
285+
},
286+
)
287+
288+
# Verify that consumer span is not a child of producer span when using links
289+
self.assertIsNone(consumer.parent)
290+
self.assertNotEqual(
291+
consumer.context.trace_id, producer.context.trace_id
292+
)
293+
294+
# Verify that consumer span has a link to the producer span
295+
self.assertEqual(len(consumer.links), 1)
296+
link = consumer.links[0]
297+
self.assertEqual(link.context.span_id, producer.context.span_id)
298+
self.assertEqual(link.context.trace_id, producer.context.trace_id)
299+
243300

244301
class TestCelerySignatureTask(TestBase):
245302
def setUp(self):

tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,3 +535,36 @@ def fn_task():
535535

536536
span = spans_list[0]
537537
assert span.resource == resource
538+
539+
540+
def test_use_span_links(celery_app, tracer_provider, memory_exporter):
541+
@celery_app.task
542+
def fn_task():
543+
return 42
544+
545+
CeleryInstrumentor().uninstrument()
546+
CeleryInstrumentor().instrument(
547+
tracer_provider=tracer_provider, use_span_links=True
548+
)
549+
550+
result = fn_task.apply_async()
551+
assert result.get(timeout=ASYNC_GET_TIMEOUT) == 42
552+
553+
spans = memory_exporter.get_finished_spans()
554+
assert len(spans) == 2
555+
556+
run_span = next(
557+
s for s in spans if s.attributes.get("celery.action") == "run"
558+
)
559+
async_span = next(
560+
s for s in spans if s.attributes.get("celery.action") == "apply_async"
561+
)
562+
563+
# The run span should not be a child of the async span when using links.
564+
assert run_span.parent is None
565+
assert run_span.context.trace_id != async_span.context.trace_id
566+
567+
assert len(run_span.links) == 1
568+
link = run_span.links[0]
569+
assert link.context.span_id == async_span.context.span_id
570+
assert link.context.trace_id == async_span.context.trace_id

0 commit comments

Comments
 (0)