@@ -57,6 +57,7 @@ def __init__(
5757 self ._config = config or BatchSpanProcessorConfig ()
5858 self ._queue : deque [CleanSpanData ] = deque (maxlen = self ._config .max_queue_size )
5959 self ._lock = threading .Lock ()
60+ self ._condition = threading .Condition (self ._lock )
6061 self ._shutdown_event = threading .Event ()
6162 self ._export_thread : threading .Thread | None = None
6263 self ._started = False
@@ -88,6 +89,9 @@ def stop(self, timeout: float | None = None) -> None:
8889 return
8990
9091 self ._shutdown_event .set ()
92+ # Wake up the export thread so it can see the shutdown event
93+ with self ._condition :
94+ self ._condition .notify_all ()
9195
9296 if self ._export_thread is not None :
9397 self ._export_thread .join (timeout = timeout or self ._config .export_timeout_seconds )
@@ -108,7 +112,7 @@ def add_span(self, span: CleanSpanData) -> bool:
108112 Returns:
109113 True if span was added, False if queue is full and span was dropped
110114 """
111- with self ._lock :
115+ with self ._condition :
112116 if len (self ._queue ) >= self ._config .max_queue_size :
113117 self ._dropped_spans += 1
114118 logger .warning (
@@ -121,16 +125,17 @@ def add_span(self, span: CleanSpanData) -> bool:
121125
122126 # Trigger immediate export if batch size reached
123127 if len (self ._queue ) >= self ._config .max_export_batch_size :
124- # Signal export thread to wake up (if using condition variable)
125- pass
128+ self ._condition .notify ()
126129
127130 return True
128131
129132 def _export_loop (self ) -> None :
130133 """Background thread that periodically exports spans."""
131134 while not self ._shutdown_event .is_set ():
132- # Wait for scheduled delay or shutdown
133- self ._shutdown_event .wait (timeout = self ._config .scheduled_delay_seconds )
135+ # Wait for either: batch size reached, scheduled delay, or shutdown
136+ with self ._condition :
137+ # Wait until batch is ready or timeout
138+ self ._condition .wait (timeout = self ._config .scheduled_delay_seconds )
134139
135140 if self ._shutdown_event .is_set ():
136141 break
@@ -141,7 +146,7 @@ def _export_batch(self) -> None:
141146 """Export a batch of spans from the queue."""
142147 # Get batch of spans
143148 batch : list [CleanSpanData ] = []
144- with self ._lock :
149+ with self ._condition :
145150 while self ._queue and len (batch ) < self ._config .max_export_batch_size :
146151 batch .append (self ._queue .popleft ())
147152
@@ -171,7 +176,7 @@ def _export_batch(self) -> None:
171176 def _force_flush (self ) -> None :
172177 """Force export all remaining spans in the queue."""
173178 while True :
174- with self ._lock :
179+ with self ._condition :
175180 if not self ._queue :
176181 break
177182
@@ -180,7 +185,7 @@ def _force_flush(self) -> None:
180185 @property
181186 def queue_size (self ) -> int :
182187 """Get the current queue size."""
183- with self ._lock :
188+ with self ._condition :
184189 return len (self ._queue )
185190
186191 @property
0 commit comments