Skip to content

Commit 5c20a0e

Browse files
committed
feat(celery): Support span streaming
1 parent bc5759c commit 5c20a0e

File tree

1 file changed

+140
-64
lines changed

1 file changed

+140
-64
lines changed

sentry_sdk/integrations/celery/__init__.py

Lines changed: 140 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
)
1515
from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch
1616
from sentry_sdk.integrations.logging import ignore_logger
17+
from sentry_sdk.traces import StreamedSpan
1718
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, Span, TransactionSource
18-
from sentry_sdk.tracing_utils import Baggage
19+
from sentry_sdk.tracing_utils import Baggage, has_span_streaming_enabled
1920
from sentry_sdk.utils import (
2021
capture_internal_exceptions,
21-
ensure_integration_enabled,
2222
event_from_exception,
2323
reraise,
2424
)
@@ -162,7 +162,9 @@ def event_processor(event: "Event", hint: "Hint") -> "Optional[Event]":
162162

163163

164164
def _update_celery_task_headers(
165-
original_headers: "dict[str, Any]", span: "Optional[Span]", monitor_beat_tasks: bool
165+
original_headers: "dict[str, Any]",
166+
span: "Optional[Union[StreamedSpan, Span]]",
167+
monitor_beat_tasks: bool,
166168
) -> "dict[str, Any]":
167169
"""
168170
Updates the headers of the Celery task with the tracing information
@@ -255,7 +257,8 @@ def _wrap_task_run(f: "F") -> "F":
255257
def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
256258
# Note: kwargs can contain headers=None, so no setdefault!
257259
# Unsure which backend though.
258-
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
260+
client = sentry_sdk.get_client()
261+
integration = client.get_integration(CeleryIntegration)
259262
if integration is None:
260263
return f(*args, **kwargs)
261264

@@ -274,17 +277,28 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
274277
else:
275278
task_name = "<unknown Celery task>"
276279

280+
span_streaming = has_span_streaming_enabled(client.options)
281+
277282
task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat"
278283

279-
span_mgr: "Union[Span, NoOpMgr]" = (
280-
sentry_sdk.start_span(
281-
op=OP.QUEUE_SUBMIT_CELERY,
282-
name=task_name,
283-
origin=CeleryIntegration.origin,
284-
)
285-
if not task_started_from_beat
286-
else NoOpMgr()
287-
)
284+
span_mgr: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr()
285+
if span_streaming:
286+
if not task_started_from_beat:
287+
span_mgr = sentry_sdk.traces.start_span(
288+
name=task_name,
289+
attributes={
290+
"sentry.op": OP.QUEUE_SUBMIT_CELERY,
291+
"sentry.origin": CeleryIntegration.origin,
292+
},
293+
)
294+
295+
else:
296+
if not task_started_from_beat:
297+
span_mgr = sentry_sdk.start_span(
298+
op=OP.QUEUE_SUBMIT_CELERY,
299+
name=task_name,
300+
origin=CeleryIntegration.origin,
301+
)
288302

289303
with span_mgr as span:
290304
kwargs["headers"] = _update_celery_task_headers(
@@ -303,50 +317,73 @@ def _wrap_tracer(task: "Any", f: "F") -> "F":
303317
# Also because in Celery 3, signal dispatch returns early if one handler
304318
# crashes.
305319
@wraps(f)
306-
@ensure_integration_enabled(CeleryIntegration, f)
307320
def _inner(*args: "Any", **kwargs: "Any") -> "Any":
321+
client = sentry_sdk.get_client()
322+
if client.get_integration(CeleryIntegration) is None:
323+
return f(*args, **kwargs)
324+
325+
span_streaming = has_span_streaming_enabled(client.options)
326+
308327
with isolation_scope() as scope:
309328
scope._name = "celery"
310329
scope.clear_breadcrumbs()
311330
scope.add_event_processor(_make_event_processor(task, *args, **kwargs))
312331

313-
transaction = None
332+
transaction: "Optional[Union[Span, StreamedSpan]]" = None
333+
span_ctx: "Optional[Union[Span, StreamedSpan]]" = None
314334

315335
# Celery task objects are not a thing to be trusted. Even
316336
# something such as attribute access can fail.
317337
with capture_internal_exceptions():
318338
headers = args[3].get("headers") or {}
319-
transaction = continue_trace(
320-
headers,
321-
op=OP.QUEUE_TASK_CELERY,
322-
name="unknown celery task",
323-
source=TransactionSource.TASK,
324-
origin=CeleryIntegration.origin,
325-
)
326-
transaction.name = task.name
327-
transaction.set_status(SPANSTATUS.OK)
339+
if span_streaming:
340+
sentry_sdk.traces.continue_trace(headers)
341+
transaction = sentry_sdk.traces.start_span(
342+
name=task.name,
343+
attributes={
344+
"sentry.origin": CeleryIntegration.origin,
345+
"sentry.span.source": TransactionSource.TASK.value,
346+
"sentry.op": OP.QUEUE_TASK_CELERY,
347+
},
348+
)
328349

329-
if transaction is None:
350+
span_ctx = transaction
351+
352+
else:
353+
transaction = continue_trace(
354+
headers,
355+
op=OP.QUEUE_TASK_CELERY,
356+
name=task.name,
357+
source=TransactionSource.TASK,
358+
origin=CeleryIntegration.origin,
359+
)
360+
transaction.set_status(SPANSTATUS.OK)
361+
362+
span_ctx = sentry_sdk.start_transaction(
363+
transaction,
364+
custom_sampling_context={
365+
"celery_job": {
366+
"task": task.name,
367+
# for some reason, args[1] is a list if non-empty but a
368+
# tuple if empty
369+
"args": list(args[1]),
370+
"kwargs": args[2],
371+
}
372+
},
373+
)
374+
375+
if transaction is None or span_ctx is None:
330376
return f(*args, **kwargs)
331377

332-
with sentry_sdk.start_transaction(
333-
transaction,
334-
custom_sampling_context={
335-
"celery_job": {
336-
"task": task.name,
337-
# for some reason, args[1] is a list if non-empty but a
338-
# tuple if empty
339-
"args": list(args[1]),
340-
"kwargs": args[2],
341-
}
342-
},
343-
):
378+
with span_ctx:
344379
return f(*args, **kwargs)
345380

346381
return _inner # type: ignore
347382

348383

349-
def _set_messaging_destination_name(task: "Any", span: "Span") -> None:
384+
def _set_messaging_destination_name(
385+
task: "Any", span: "Union[StreamedSpan, Span]"
386+
) -> None:
350387
"""Set "messaging.destination.name" tag for span"""
351388
with capture_internal_exceptions():
352389
delivery_info = task.request.delivery_info
@@ -355,26 +392,43 @@ def _set_messaging_destination_name(task: "Any", span: "Span") -> None:
355392
if delivery_info.get("exchange") == "" and routing_key is not None:
356393
# Empty exchange indicates the default exchange, meaning the tasks
357394
# are sent to the queue with the same name as the routing key.
358-
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
395+
if isinstance(span, StreamedSpan):
396+
span.set_attribute(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
397+
else:
398+
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
359399

360400

361401
def _wrap_task_call(task: "Any", f: "F") -> "F":
362402
# Need to wrap task call because the exception is caught before we get to
363403
# see it. Also celery's reported stacktrace is untrustworthy.
364404

365-
# functools.wraps is important here because celery-once looks at this
366-
# method's name. @ensure_integration_enabled internally calls functools.wraps,
367-
# but if we ever remove the @ensure_integration_enabled decorator, we need
368-
# to add @functools.wraps(f) here.
369-
# https://github.com/getsentry/sentry-python/issues/421
370-
@ensure_integration_enabled(CeleryIntegration, f)
405+
@wraps(f)
371406
def _inner(*args: "Any", **kwargs: "Any") -> "Any":
407+
client = sentry_sdk.get_client()
408+
if client.get_integration(CeleryIntegration) is None:
409+
return f(*args, **kwargs)
410+
411+
span_streaming = has_span_streaming_enabled(client.options)
412+
372413
try:
373-
with sentry_sdk.start_span(
374-
op=OP.QUEUE_PROCESS,
375-
name=task.name,
376-
origin=CeleryIntegration.origin,
377-
) as span:
414+
span: "Union[Span, StreamedSpan]"
415+
if span_streaming:
416+
span = sentry_sdk.traces.start_span(name=task.name)
417+
span.set_attribute("sentry.op", OP.QUEUE_PROCESS)
418+
span.set_attribute("sentry.origin", CeleryIntegration.origin)
419+
else:
420+
span = sentry_sdk.start_span(
421+
op=OP.QUEUE_PROCESS,
422+
name=task.name,
423+
origin=CeleryIntegration.origin,
424+
)
425+
426+
with span:
427+
if isinstance(span, StreamedSpan):
428+
set_on_span = span.set_attribute
429+
else:
430+
set_on_span = span.set_data
431+
378432
_set_messaging_destination_name(task, span)
379433

380434
latency = None
@@ -389,19 +443,19 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":
389443

390444
if latency is not None:
391445
latency *= 1000 # milliseconds
392-
span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
446+
set_on_span(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
393447

394448
with capture_internal_exceptions():
395-
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
449+
set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
396450

397451
with capture_internal_exceptions():
398-
span.set_data(
452+
set_on_span(
399453
SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
400454
)
401455

402456
with capture_internal_exceptions():
403457
with task.app.connection() as conn:
404-
span.set_data(
458+
set_on_span(
405459
SPANDATA.MESSAGING_SYSTEM,
406460
conn.transport.driver_type,
407461
)
@@ -476,8 +530,13 @@ def sentry_workloop(*args: "Any", **kwargs: "Any") -> "Any":
476530
def _patch_producer_publish() -> None:
477531
original_publish = Producer.publish
478532

479-
@ensure_integration_enabled(CeleryIntegration, original_publish)
480533
def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
534+
client = sentry_sdk.get_client()
535+
if client.get_integration(CeleryIntegration) is None:
536+
return original_publish(self, *args, **kwargs)
537+
538+
span_streaming = has_span_streaming_enabled(client.options)
539+
481540
kwargs_headers = kwargs.get("headers", {})
482541
if not isinstance(kwargs_headers, Mapping):
483542
# Ensure kwargs_headers is a Mapping, so we can safely call get().
@@ -487,31 +546,48 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
487546
# method will still work.
488547
kwargs_headers = {}
489548

549+
if "task" not in kwargs_headers:
550+
# filter out heartbeat and other internal Celery events
551+
return original_publish(self, *args, **kwargs)
552+
490553
task_name = kwargs_headers.get("task")
491554
task_id = kwargs_headers.get("id")
492555
retries = kwargs_headers.get("retries")
493556

494557
routing_key = kwargs.get("routing_key")
495558
exchange = kwargs.get("exchange")
496559

497-
with sentry_sdk.start_span(
498-
op=OP.QUEUE_PUBLISH,
499-
name=task_name,
500-
origin=CeleryIntegration.origin,
501-
) as span:
560+
span: "Union[StreamedSpan, Span]"
561+
if span_streaming:
562+
span = sentry_sdk.traces.start_span(name=task_name)
563+
span.set_attribute("sentry.op", OP.QUEUE_PUBLISH)
564+
span.set_attribute("sentry.origin", CeleryIntegration.origin)
565+
else:
566+
span = sentry_sdk.start_span(
567+
op=OP.QUEUE_PUBLISH,
568+
name=task_name,
569+
origin=CeleryIntegration.origin,
570+
)
571+
572+
with span:
573+
if isinstance(span, StreamedSpan):
574+
set_on_span = span.set_attribute
575+
else:
576+
set_on_span = span.set_data
577+
502578
if task_id is not None:
503-
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
579+
set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
504580

505581
if exchange == "" and routing_key is not None:
506582
# Empty exchange indicates the default exchange, meaning messages are
507583
# routed to the queue with the same name as the routing key.
508-
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
584+
set_on_span(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
509585

510586
if retries is not None:
511-
span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
587+
set_on_span(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
512588

513589
with capture_internal_exceptions():
514-
span.set_data(
590+
set_on_span(
515591
SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
516592
)
517593

0 commit comments

Comments
 (0)