Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions drift/core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
self._condition = threading.Condition(self._lock)
self._shutdown_event = threading.Event()
self._export_thread: threading.Thread | None = None
self._thread_loop: asyncio.AbstractEventLoop | None = None
self._started = False
self._dropped_spans = 0

Expand Down Expand Up @@ -158,16 +159,23 @@ def add_span(self, span: CleanSpanData) -> bool:

def _export_loop(self) -> None:
"""Background thread that periodically exports spans."""
while not self._shutdown_event.is_set():
# Wait for either: batch size reached, scheduled delay, or shutdown
with self._condition:
# Wait until batch is ready or timeout
self._condition.wait(timeout=self._config.scheduled_delay_seconds)
# Create a single long-lived event loop for this thread
self._thread_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._thread_loop)

if self._shutdown_event.is_set():
break
try:
while not self._shutdown_event.is_set():
# Wait for either: batch size reached, scheduled delay, or shutdown
with self._condition:
self._condition.wait(timeout=self._config.scheduled_delay_seconds)

self._export_batch()
if self._shutdown_event.is_set():
break

self._export_batch()
finally:
self._thread_loop.close()
self._thread_loop = None

def _export_batch(self) -> None:
"""Export a batch of spans from the queue."""
Expand All @@ -188,14 +196,19 @@ def _export_batch(self) -> None:
for adapter in adapters:
start_time = time.monotonic()
try:
# Handle async adapters (create new event loop for this thread)
# Handle async adapters
if asyncio.iscoroutinefunction(adapter.export_spans):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(adapter.export_spans(batch))
finally:
loop.close()
# Use the thread's long-lived event loop if available,
# otherwise create a temporary one (e.g., during force_flush after shutdown)
if self._thread_loop is not None and not self._thread_loop.is_closed():
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
self._thread_loop.run_until_complete(adapter.export_spans(batch))
else:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(adapter.export_spans(batch))
finally:
loop.close()
else:
adapter.export_spans(batch)

Expand Down