@@ -43,36 +43,51 @@ def __init__(
4343 self ._record_lost_func = record_lost_func
4444 self ._running = True
4545 self ._lock = threading .Lock ()
46+ self ._active : "threading.local" = threading .local ()
4647
4748 self ._flush_event : "threading.Event" = threading .Event ()
4849
4950 self ._flusher : "Optional[threading.Thread]" = None
5051 self ._flusher_pid : "Optional[int]" = None
5152
5253 def add (self , span : "StreamedSpan" ) -> None :
53- if not self ._ensure_thread () or self ._flusher is None :
54+ # Bail out if the current thread is already executing batcher code.
55+ # This prevents deadlocks when code running inside the batcher (e.g.
56+ # _add_to_envelope during flush, or _flush_event.wait/set) triggers
57+ # a GC-emitted warning that routes back through the logging
58+ # integration into add().
59+
60+ if getattr (self ._active , "flag" , False ):
5461 return None
5562
56- with self ._lock :
57- size = len (self ._span_buffer [span .trace_id ])
58- if size >= self .MAX_BEFORE_DROP :
59- self ._record_lost_func (
60- reason = "queue_overflow" ,
61- data_category = "span" ,
62- quantity = 1 ,
63- )
64- return None
65-
66- self ._span_buffer [span .trace_id ].append (span )
67- self ._running_size [span .trace_id ] += self ._estimate_size (span )
68-
69- if size + 1 >= self .MAX_BEFORE_FLUSH :
70- self ._flush_event .set ()
71- return
63+ self ._active .flag = True
7264
73- if self ._running_size [span .trace_id ] >= self .MAX_BYTES_BEFORE_FLUSH :
74- self ._flush_event .set ()
75- return
65+ if not self ._ensure_thread () or self ._flusher is None :
66+ return None
67+
68+ try :
69+ with self ._lock :
70+ size = len (self ._span_buffer [span .trace_id ])
71+ if size >= self .MAX_BEFORE_DROP :
72+ self ._record_lost_func (
73+ reason = "queue_overflow" ,
74+ data_category = "span" ,
75+ quantity = 1 ,
76+ )
77+ return None
78+
79+ self ._span_buffer [span .trace_id ].append (span )
80+ self ._running_size [span .trace_id ] += self ._estimate_size (span )
81+
82+ if size + 1 >= self .MAX_BEFORE_FLUSH :
83+ self ._flush_event .set ()
84+ return
85+
86+ if self ._running_size [span .trace_id ] >= self .MAX_BYTES_BEFORE_FLUSH :
87+ self ._flush_event .set ()
88+ return
89+ finally :
90+ self ._active .flag = False
7691
7792 @staticmethod
7893 def _estimate_size (item : "StreamedSpan" ) -> int :
0 commit comments