Skip to content

Commit 168a581

Browse files
committed
Commit
1 parent fd714f8 commit 168a581

File tree

1 file changed

+28
-15
lines changed

1 file changed

+28
-15
lines changed

drift/core/batch_processor.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def __init__(
6363
self._condition = threading.Condition(self._lock)
6464
self._shutdown_event = threading.Event()
6565
self._export_thread: threading.Thread | None = None
66+
self._thread_loop: asyncio.AbstractEventLoop | None = None
6667
self._started = False
6768
self._dropped_spans = 0
6869

@@ -158,16 +159,23 @@ def add_span(self, span: CleanSpanData) -> bool:
158159

159160
def _export_loop(self) -> None:
160161
"""Background thread that periodically exports spans."""
161-
while not self._shutdown_event.is_set():
162-
# Wait for either: batch size reached, scheduled delay, or shutdown
163-
with self._condition:
164-
# Wait until batch is ready or timeout
165-
self._condition.wait(timeout=self._config.scheduled_delay_seconds)
162+
# Create a single long-lived event loop for this thread
163+
self._thread_loop = asyncio.new_event_loop()
164+
asyncio.set_event_loop(self._thread_loop)
166165

167-
if self._shutdown_event.is_set():
168-
break
166+
try:
167+
while not self._shutdown_event.is_set():
168+
# Wait for either: batch size reached, scheduled delay, or shutdown
169+
with self._condition:
170+
self._condition.wait(timeout=self._config.scheduled_delay_seconds)
169171

170-
self._export_batch()
172+
if self._shutdown_event.is_set():
173+
break
174+
175+
self._export_batch()
176+
finally:
177+
self._thread_loop.close()
178+
self._thread_loop = None
171179

172180
def _export_batch(self) -> None:
173181
"""Export a batch of spans from the queue."""
@@ -188,14 +196,19 @@ def _export_batch(self) -> None:
188196
for adapter in adapters:
189197
start_time = time.monotonic()
190198
try:
191-
# Handle async adapters (create new event loop for this thread)
199+
# Handle async adapters
192200
if asyncio.iscoroutinefunction(adapter.export_spans):
193-
loop = asyncio.new_event_loop()
194-
asyncio.set_event_loop(loop)
195-
try:
196-
loop.run_until_complete(adapter.export_spans(batch))
197-
finally:
198-
loop.close()
201+
# Use the thread's long-lived event loop if available,
202+
# otherwise create a temporary one (e.g., during force_flush after shutdown)
203+
if self._thread_loop is not None and not self._thread_loop.is_closed():
204+
self._thread_loop.run_until_complete(adapter.export_spans(batch))
205+
else:
206+
loop = asyncio.new_event_loop()
207+
asyncio.set_event_loop(loop)
208+
try:
209+
loop.run_until_complete(adapter.export_spans(batch))
210+
finally:
211+
loop.close()
199212
else:
200213
adapter.export_spans(batch)
201214

0 commit comments

Comments
 (0)