Skip to content

Commit 96091f5

Browse files
AAgnihotryclaude
andauthored
fix: make LiveTrackingSpanProcessor upsert calls non-blocking (#1116)
Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 34511d9 commit 96091f5

File tree

4 files changed

+254
-17
lines changed

4 files changed

+254
-17
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath"
3-
version = "2.4.22"
3+
version = "2.4.23"
44
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"

src/uipath/_cli/_evals/_runtime.py

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import uuid
44
from collections import defaultdict
5+
from concurrent.futures import ThreadPoolExecutor
56
from contextlib import contextmanager
67
from pathlib import Path
78
from time import time
@@ -166,33 +167,59 @@ class LiveTrackingSpanProcessor(SpanProcessor):
166167
Sends real-time span updates:
167168
- On span start: Upsert with RUNNING status
168169
- On span end: Upsert with final status (OK/ERROR)
170+
171+
All upsert calls run in background threads without blocking evaluation
172+
execution. Uses a thread pool to cap the maximum number of concurrent
173+
threads and avoid resource exhaustion.
169174
"""
170175

171-
def __init__(self, exporter: LlmOpsHttpExporter):
176+
def __init__(
177+
self,
178+
exporter: LlmOpsHttpExporter,
179+
max_workers: int = 10,
180+
):
172181
self.exporter = exporter
173182
self.span_status = SpanStatus
183+
self.executor = ThreadPoolExecutor(
184+
max_workers=max_workers, thread_name_prefix="span-upsert"
185+
)
186+
187+
def _upsert_span_async(
188+
self, span: Span | ReadableSpan, status_override: int | None = None
189+
) -> None:
190+
"""Run upsert_span in a background thread without blocking.
191+
192+
Submits the upsert task to the thread pool and returns immediately.
193+
The thread pool handles execution with max_workers cap to prevent
194+
resource exhaustion.
195+
"""
196+
197+
def _upsert():
198+
try:
199+
if status_override:
200+
self.exporter.upsert_span(span, status_override=status_override)
201+
else:
202+
self.exporter.upsert_span(span)
203+
except Exception as e:
204+
logger.debug(f"Failed to upsert span: {e}")
205+
206+
# Submit to thread pool and return immediately (non-blocking)
207+
# The timeout parameter is reserved for shutdown operations
208+
self.executor.submit(_upsert)
174209

175210
def on_start(
176211
self, span: Span, parent_context: context_api.Context | None = None
177212
) -> None:
178-
"""Called when span starts - upsert with RUNNING status."""
213+
"""Called when span starts - upsert with RUNNING status (non-blocking)."""
179214
# Only track evaluation-related spans
180215
if span.attributes and self._is_eval_span(span):
181-
try:
182-
self.exporter.upsert_span(
183-
span, status_override=self.span_status.RUNNING
184-
)
185-
except Exception as e:
186-
logger.debug(f"Failed to upsert span on start: {e}")
216+
self._upsert_span_async(span, status_override=self.span_status.RUNNING)
187217

188218
def on_end(self, span: ReadableSpan) -> None:
189-
"""Called when span ends - upsert with final status."""
219+
"""Called when span ends - upsert with final status (non-blocking)."""
190220
# Only track evaluation-related spans
191221
if span.attributes and self._is_eval_span(span):
192-
try:
193-
self.exporter.upsert_span(span)
194-
except Exception as e:
195-
logger.debug(f"Failed to upsert span on end: {e}")
222+
self._upsert_span_async(span)
196223

197224
def _is_eval_span(self, span: Span | ReadableSpan) -> bool:
198225
"""Check if span is evaluation-related."""
@@ -219,8 +246,11 @@ def _is_eval_span(self, span: Span | ReadableSpan) -> bool:
219246
return False
220247

221248
def shutdown(self) -> None:
222-
"""Shutdown the processor."""
223-
pass
249+
"""Shutdown the processor and wait for pending tasks to complete."""
250+
try:
251+
self.executor.shutdown(wait=True)
252+
except Exception as e:
253+
logger.debug(f"Executor shutdown failed: {e}")
224254

225255
def force_flush(self, timeout_millis: int = 30000) -> bool:
226256
"""Force flush - no-op for live tracking."""

tests/cli/eval/test_live_tracking_span_processor.py

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Tests for LiveTrackingSpanProcessor in _runtime.py."""
22

3+
import threading
4+
import time
35
from typing import Any
46
from unittest.mock import Mock
57

@@ -290,3 +292,208 @@ def test_processor_handles_all_eval_span_types(self, processor):
290292
assert processor._is_eval_span(span) is True, (
291293
f"Failed for span_type: {span_type}"
292294
)
295+
296+
# Tests for ThreadPoolExecutor behavior
297+
298+
def test_thread_pool_executor_used(self, mock_exporter):
299+
"""Test that processor uses ThreadPoolExecutor for async operations."""
300+
processor = LiveTrackingSpanProcessor(mock_exporter)
301+
span = self.create_mock_span({"span_type": "eval"})
302+
303+
# Verify executor exists
304+
assert hasattr(processor, "executor")
305+
assert processor.executor is not None
306+
307+
# Submit task and verify it's non-blocking
308+
start_time = time.time()
309+
processor.on_start(span, None)
310+
elapsed = time.time() - start_time
311+
312+
# Should return immediately (< 0.05 seconds)
313+
assert elapsed < 0.05, f"on_start blocked for {elapsed} seconds"
314+
315+
def test_handles_exceptions_gracefully(self, mock_exporter):
316+
"""Test that exceptions in background threads don't crash."""
317+
mock_exporter.upsert_span = Mock(side_effect=Exception("Network error"))
318+
processor = LiveTrackingSpanProcessor(mock_exporter)
319+
span = self.create_mock_span({"span_type": "eval"})
320+
321+
# Should not raise exception
322+
processor.on_start(span, None)
323+
# Wait for background thread to process
324+
time.sleep(0.2)
325+
326+
# Main thread should still be alive
327+
assert threading.current_thread().is_alive()
328+
329+
def test_upsert_span_async_with_status_override(self, mock_exporter):
330+
"""Test _upsert_span_async correctly passes status_override."""
331+
processor = LiveTrackingSpanProcessor(mock_exporter)
332+
span = self.create_mock_span({"span_type": "eval"})
333+
334+
processor._upsert_span_async(span, status_override=SpanStatus.RUNNING)
335+
336+
# Wait for background thread to complete
337+
time.sleep(0.2)
338+
339+
mock_exporter.upsert_span.assert_called_once_with(
340+
span, status_override=SpanStatus.RUNNING
341+
)
342+
343+
def test_upsert_span_async_without_status_override(self, mock_exporter):
344+
"""Test _upsert_span_async without status_override."""
345+
processor = LiveTrackingSpanProcessor(mock_exporter)
346+
span = self.create_mock_readable_span({"span_type": "eval"})
347+
348+
processor._upsert_span_async(span, status_override=None)
349+
350+
# Wait for background thread to complete
351+
time.sleep(0.2)
352+
353+
mock_exporter.upsert_span.assert_called_once_with(span)
354+
355+
def test_processor_initialization_with_custom_max_workers(self, mock_exporter):
356+
"""Test processor can be initialized with custom max_workers."""
357+
processor = LiveTrackingSpanProcessor(mock_exporter, max_workers=15)
358+
assert processor.executor._max_workers == 15
359+
360+
def test_exception_in_background_thread_does_not_crash(self, mock_exporter):
361+
"""Test that exceptions in background threads don't crash the main thread."""
362+
mock_exporter.upsert_span = Mock(side_effect=Exception("Background error"))
363+
processor = LiveTrackingSpanProcessor(mock_exporter)
364+
span = self.create_mock_span({"span_type": "eval"})
365+
366+
# Should not raise exception
367+
processor.on_start(span, None)
368+
time.sleep(0.2) # Wait for background thread
369+
370+
# Main thread should still be alive
371+
assert threading.current_thread().is_alive()
372+
373+
# Tests for ThreadPoolExecutor and max_workers
374+
375+
def test_processor_with_custom_max_workers(self, mock_exporter):
376+
"""Test processor can be initialized with custom max_workers."""
377+
processor = LiveTrackingSpanProcessor(mock_exporter, max_workers=20)
378+
assert processor.executor._max_workers == 20
379+
380+
def test_processor_default_max_workers(self, mock_exporter):
381+
"""Test processor uses default max_workers of 10."""
382+
processor = LiveTrackingSpanProcessor(mock_exporter)
383+
assert processor.executor._max_workers == 10
384+
385+
def test_thread_pool_caps_concurrent_threads(self, mock_exporter):
386+
"""Test that thread pool caps concurrent threads to max_workers."""
387+
concurrent_calls = []
388+
max_concurrent = 0
389+
390+
def slow_upsert(*args, **kwargs):
391+
concurrent_calls.append(1)
392+
nonlocal max_concurrent
393+
max_concurrent = max(max_concurrent, len(concurrent_calls))
394+
time.sleep(0.5)
395+
concurrent_calls.pop()
396+
397+
mock_exporter.upsert_span = Mock(side_effect=slow_upsert)
398+
processor = LiveTrackingSpanProcessor(mock_exporter, max_workers=3)
399+
400+
# Submit 10 tasks rapidly
401+
spans = [
402+
self.create_mock_span({"span_type": "eval", "id": str(i)})
403+
for i in range(10)
404+
]
405+
406+
for span in spans:
407+
processor.on_start(span, None)
408+
409+
# Wait for all to complete
410+
time.sleep(2)
411+
412+
# Max concurrent should not exceed max_workers (3)
413+
assert max_concurrent <= 3, (
414+
f"Max concurrent was {max_concurrent}, expected <= 3"
415+
)
416+
417+
def test_shutdown_waits_for_pending_tasks(self, mock_exporter):
418+
"""Test that shutdown properly cleans up the thread pool."""
419+
processor = LiveTrackingSpanProcessor(mock_exporter, max_workers=2)
420+
421+
# Submit some tasks
422+
for i in range(3):
423+
span = self.create_mock_span({"span_type": "eval", "id": str(i)})
424+
processor.on_start(span, None)
425+
426+
# Shutdown should complete without errors
427+
processor.shutdown()
428+
429+
# Verify executor is shutdown (calling shutdown multiple times should be safe)
430+
processor.shutdown() # Should not raise
431+
432+
def test_multiple_processors_independent_thread_pools(self, mock_exporter):
433+
"""Test that multiple processors have independent thread pools."""
434+
processor1 = LiveTrackingSpanProcessor(mock_exporter, max_workers=5)
435+
processor2 = LiveTrackingSpanProcessor(mock_exporter, max_workers=15)
436+
437+
assert processor1.executor != processor2.executor
438+
assert processor1.executor._max_workers == 5
439+
assert processor2.executor._max_workers == 15
440+
441+
def test_thread_pool_name_prefix(self, mock_exporter):
442+
"""Test that thread pool uses correct name prefix."""
443+
processor = LiveTrackingSpanProcessor(mock_exporter)
444+
# ThreadPoolExecutor sets _thread_name_prefix
445+
assert processor.executor._thread_name_prefix == "span-upsert"
446+
447+
def test_resource_exhaustion_prevention(self, mock_exporter):
448+
"""Test that max_workers prevents resource exhaustion."""
449+
call_times = []
450+
451+
def timed_upsert(*args, **kwargs):
452+
call_times.append(time.time())
453+
time.sleep(0.3)
454+
455+
mock_exporter.upsert_span = Mock(side_effect=timed_upsert)
456+
# Very low max_workers to test queueing
457+
processor = LiveTrackingSpanProcessor(mock_exporter, max_workers=2)
458+
459+
# Submit 6 tasks
460+
for i in range(6):
461+
span = self.create_mock_span({"span_type": "eval", "id": str(i)})
462+
processor.on_start(span, None)
463+
464+
# Wait for all to complete
465+
time.sleep(2)
466+
467+
# All 6 should complete
468+
assert len(call_times) == 6
469+
470+
# With max_workers=2 and 0.3s per task, we should see batching
471+
# Sort by time to analyze execution pattern
472+
call_times.sort()
473+
# First 2 should start quickly, next batch should wait
474+
assert call_times[1] - call_times[0] < 0.2 # First batch starts together
475+
assert (
476+
call_times[3] - call_times[1] > 0.2
477+
) # Second batch waits for first to finish
478+
479+
def test_shutdown_can_be_called_multiple_times(self, mock_exporter):
480+
"""Test that shutdown can be safely called multiple times."""
481+
processor = LiveTrackingSpanProcessor(mock_exporter)
482+
span = self.create_mock_span({"span_type": "eval"})
483+
484+
processor.on_start(span, None)
485+
time.sleep(0.1)
486+
487+
# Multiple shutdowns should not raise exceptions
488+
processor.shutdown()
489+
processor.shutdown()
490+
processor.shutdown()
491+
492+
def test_executor_properly_initialized(self, mock_exporter):
493+
"""Test that ThreadPoolExecutor is properly initialized."""
494+
processor = LiveTrackingSpanProcessor(mock_exporter, max_workers=7)
495+
496+
assert processor.executor is not None
497+
assert hasattr(processor.executor, "submit")
498+
assert hasattr(processor.executor, "shutdown")
499+
assert processor.executor._max_workers == 7

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)