@@ -283,7 +283,7 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
283283
284284 span_mgr : "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr ()
285285 if span_streaming :
286- if not task_started_from_beat :
286+ if not task_started_from_beat and sentry_sdk . get_current_span () is not None :
287287 span_mgr = sentry_sdk .traces .start_span (
288288 name = task_name ,
289289 attributes = {
@@ -329,16 +329,29 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":
329329 scope .clear_breadcrumbs ()
330330 scope .add_event_processor (_make_event_processor (task , * args , ** kwargs ))
331331
332- transaction : "Optional[Union[Span, StreamedSpan]]" = None
332+ span : "Optional[Union[Span, StreamedSpan]]" = None
333333 span_ctx : "Optional[Union[Span, StreamedSpan]]" = None
334334
335+ custom_sampling_context = {
336+ "celery_job" : {
337+ "task" : task .name ,
338+ # for some reason, args[1] is a list if non-empty but a
339+ # tuple if empty
340+ "args" : list (args [1 ]),
341+ "kwargs" : args [2 ],
342+ }
343+ }
344+
345+ scope .set_custom_sampling_context (custom_sampling_context )
346+ scope .set_transaction_name (task .name , source = TransactionSource .TASK )
347+
335348 # Celery task objects are not a thing to be trusted. Even
336349 # something such as attribute access can fail.
337350 with capture_internal_exceptions ():
338351 headers = args [3 ].get ("headers" ) or {}
339352 if span_streaming :
340353 sentry_sdk .traces .continue_trace (headers )
341- transaction = sentry_sdk .traces .start_span (
354+ span = sentry_sdk .traces .start_span (
342355 name = task .name ,
343356 attributes = {
344357 "sentry.origin" : CeleryIntegration .origin ,
@@ -347,32 +360,24 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":
347360 },
348361 )
349362
350- span_ctx = transaction
363+ span_ctx = span
351364
352365 else :
353- transaction = continue_trace (
366+ span = continue_trace (
354367 headers ,
355368 op = OP .QUEUE_TASK_CELERY ,
356369 name = task .name ,
357370 source = TransactionSource .TASK ,
358371 origin = CeleryIntegration .origin ,
359372 )
360- transaction .set_status (SPANSTATUS .OK )
373+ span .set_status (SPANSTATUS .OK )
361374
362375 span_ctx = sentry_sdk .start_transaction (
363- transaction ,
364- custom_sampling_context = {
365- "celery_job" : {
366- "task" : task .name ,
367- # for some reason, args[1] is a list if non-empty but a
368- # tuple if empty
369- "args" : list (args [1 ]),
370- "kwargs" : args [2 ],
371- }
372- },
376+ span ,
377+ custom_sampling_context = custom_sampling_context ,
373378 )
374379
375- if transaction is None or span_ctx is None :
380+ if span is None or span_ctx is None :
376381 return f (* args , ** kwargs )
377382
378383 with span_ctx :
@@ -557,18 +562,22 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
557562 routing_key = kwargs .get ("routing_key" )
558563 exchange = kwargs .get ("exchange" )
559564
560- span : "Union[StreamedSpan, Span]"
565+ span : "Union[StreamedSpan, Span, None]" = None
561566 if span_streaming :
562- span = sentry_sdk .traces .start_span (name = task_name )
563- span .set_attribute ("sentry.op" , OP .QUEUE_PUBLISH )
564- span .set_attribute ("sentry.origin" , CeleryIntegration .origin )
567+ if sentry_sdk .get_current_span () is not None :
568+ span = sentry_sdk .traces .start_span (name = task_name )
569+ span .set_attribute ("sentry.op" , OP .QUEUE_PUBLISH )
570+ span .set_attribute ("sentry.origin" , CeleryIntegration .origin )
565571 else :
566572 span = sentry_sdk .start_span (
567573 op = OP .QUEUE_PUBLISH ,
568574 name = task_name ,
569575 origin = CeleryIntegration .origin ,
570576 )
571577
578+ if span is None :
579+ return original_publish (self , * args , ** kwargs )
580+
572581 with span :
573582 if isinstance (span , StreamedSpan ):
574583 set_on_span = span .set_attribute
0 commit comments