This document describes the architecture changes for multi-threaded evaluation processing in the Splunk OpenTelemetry GenAI instrumentation.
The evaluation system processes LLM invocations through evaluators (like DeepEval) to generate quality metrics such as Bias, Toxicity, Answer Relevancy, and Faithfulness. These evaluations involve LLM-as-a-Judge calls which are I/O-bound and benefit significantly from parallel processing.
┌─────────────────────────────────────────────────────────────────────┐
│ Application │
│ │
│ LLM Call ──► TelemetryHandler ──► CompletionCallback │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Queue │ │
│ │ (unbounded) │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ Single Worker Thread │ │
│ │ │ │
│ │ ┌──────────────────┐ │ │
│ │ │ Process Item 1 │ │ │
│ │ │ (sync LLM call) │ │ │
│ │ └────────┬─────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────┐ │ │
│ │ │ Process Item 2 │ │ │
│ │ │ (sync LLM call) │ │ │
│ │ └────────┬─────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ... │ │
│ └────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Problems:
- Single thread processes one evaluation at a time
- I/O-bound LLM calls block the worker
- Long queue drain times (5+ minutes for batch)
- Sequential processing wastes time waiting for API responses
┌─────────────────────────────────────────────────────────────────────────────┐
│ Application │
│ │
│ LLM Call ──► TelemetryHandler ──► CompletionCallback │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Bounded Queue │ │
│ │ (configurable) │ │
│ └────────┬────────┘ │
│ │ │
│ ┌────────────────────────────┼────────────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────┐ │
│ │ Worker Thread 1 │ │ Worker Thread 2 │ │ Worker Thread N│ │
│ │ │ │ │ │ │ │
│ │ ┌─────────────────┐ │ │ ┌─────────────────┐ │ │ ┌─────────────┐ │ │
│ │ │ asyncio loop │ │ │ │ asyncio loop │ │ │ │asyncio loop │ │ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ │ ┌─────────────┐ │ │ │ │ ┌─────────────┐ │ │ │ │ ┌─────────┐ │ │ │
│ │ │ │ async eval │ │ │ │ │ │ async eval │ │ │ │ │ │async │ │ │ │
│ │ │ │ (Item A) │ │ │ │ │ │ (Item B) │ │ │ │ │ │eval │ │ │ │
│ │ │ └─────────────┘ │ │ │ │ └─────────────┘ │ │ │ │ │(Item C) │ │ │ │
│ │ └─────────────────┘ │ │ └─────────────────┘ │ │ │ └─────────┘ │ │ │
│ └─────────────────────┘ └─────────────────────┘ │ └─────────────┘ │ │
│ └─────────────────┘ │
│ │
│ Benefits: │
│ - Multiple workers process items in parallel │
│ - Each worker has its own asyncio event loop │
│ - Async LLM calls don't block other workers │
│ - Bounded queue provides backpressure │
│ - ~13x faster throughput │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
The Manager class orchestrates evaluation processing:
class Manager:
def __init__(self):
# Configuration from environment
self._concurrent_mode = read_concurrent_flag() # OTEL_INSTRUMENTATION_GENAI_EVALS_CONCURRENT
self._worker_count = read_worker_count() # OTEL_INSTRUMENTATION_GENAI_EVALS_WORKERS
queue_size = read_queue_size() # OTEL_INSTRUMENTATION_GENAI_EVALS_QUEUE_SIZE
# Bounded or unbounded queue
self._queue = Queue(maxsize=queue_size) if queue_size > 0 else Queue()
# Worker pool
if self._concurrent_mode:
# Start N worker threads with asyncio loops
for i in range(self._worker_count):
worker = Thread(target=self._concurrent_worker_loop, daemon=True)
worker.start()
self._workers.append(worker)
else:
# Single sequential worker (backward compatible)
self._worker_count = 1
worker = Thread(target=self._worker_loop, daemon=True)
worker.start()Key Methods:
| Method | Purpose |
|---|---|
_concurrent_worker_loop() |
Creates asyncio event loop per worker |
_async_worker_task() |
Async task that pulls from queue |
_process_invocation_async() |
Async evaluation processing |
_evaluate_invocation_async() |
Runs evaluators concurrently |
The Evaluator base class now supports async methods:
class Evaluator:
@property
def supports_async(self) -> bool:
"""Override to return True if evaluator has native async support."""
return False
async def evaluate_async(self, item: GenAI) -> list[EvaluationResult]:
"""Async evaluation entry point."""
if isinstance(item, LLMInvocation):
return list(await self.evaluate_llm_async(item))
if isinstance(item, AgentInvocation):
return list(await self.evaluate_agent_async(item))
return []
async def evaluate_llm_async(self, invocation: LLMInvocation):
"""Default: run sync method in thread pool."""
return await asyncio.to_thread(self.evaluate_llm, invocation)The DeepevalEvaluator implements native async support:
class DeepevalEvaluator(Evaluator):
@property
def supports_async(self) -> bool:
return True # Native async support
async def evaluate_llm_async(self, invocation: LLMInvocation):
# Build test case and metrics
test_case = self._build_test_case(invocation)
metrics = self._build_metrics()
# Run DeepEval asynchronously
result = await run_evaluation_async(test_case, metrics)
return self._convert_results(result)The runner handles DeepEval's internal async configuration:
async def run_evaluation_async(test_case, metrics, debug_log=None):
"""Run DeepEval evaluation asynchronously."""
# Enable DeepEval's internal async mode when concurrent mode is active
use_deepeval_async = _is_async_mode_enabled()
def _run_sync():
async_config = AsyncConfig(
run_async=use_deepeval_async, # Parallel metric evaluation
max_concurrent=10 # Up to 10 metrics in parallel
)
return deepeval_evaluate(
[test_case],
metrics,
async_config=async_config,
display_config=DisplayConfig(show_indicator=False)
)
# Run in thread pool to not block event loop
return await asyncio.to_thread(_run_sync)We leverage DeepEval's built-in AsyncConfig to enable parallel metric evaluation within each invocation. This is documented in DeepEval's Async Configs documentation.
When OTEL_INSTRUMENTATION_GENAI_EVALS_CONCURRENT=true, we enable DeepEval's internal async mode:
from deepeval.evaluate import AsyncConfig
async_config = AsyncConfig(
run_async=True, # Enable concurrent evaluation of test cases AND metrics
max_concurrent=10, # Maximum parallel metrics at any point in time
throttle_value=0 # No throttling (can be increased for rate-limited APIs)
)| Parameter | Our Value | Description |
|---|---|---|
run_async |
True (when concurrent mode enabled) |
Enables concurrent evaluation of test cases AND metrics |
max_concurrent |
10 |
Maximum number of metrics that can run in parallel |
throttle_value |
0 |
Seconds to wait between metric evaluations (for rate limiting) |
Our architecture provides two levels of concurrent processing:
┌─────────────────────────────────────────────────────────────────────────────┐
│ LEVEL 1: Worker Thread Parallelism │
│ (Our Implementation - Manager) │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │ Worker 4 │ │
│ │ Invocation A│ │ Invocation B│ │ Invocation C│ │ Invocation D│ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ LEVEL 2: Metric Parallelism │ │
│ │ (DeepEval AsyncConfig) │ │
│ │ │ │
│ │ Within each invocation, metrics run concurrently: │ │
│ │ │ │
│ │ ┌─────────┐ ┌───────────┐ ┌──────────────────┐ ┌────────────┐ │ │
│ │ │ Bias │ │ Toxicity │ │ Answer Relevancy │ │Faithfulness│ │ │
│ │ │ (LLM) │ │ (LLM) │ │ (LLM) │ │ (LLM) │ │ │
│ │ └─────────┘ └───────────┘ └──────────────────┘ └────────────┘ │ │
│ │ │ │ │ │ │ │
│ │ └────────────┴────────────────┴────────────────────┘ │ │
│ │ │ │ │
│ │ All run in parallel │ │
│ │ (up to max_concurrent=10) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Level 1 (Our Implementation):
- Multiple worker threads process different invocations simultaneously
- Controlled by
OTEL_INSTRUMENTATION_GENAI_EVALS_WORKERS
Level 2 (DeepEval's AsyncConfig):
- Within each invocation, multiple metrics (Bias, Toxicity, etc.) run in parallel
- Controlled by
AsyncConfig(run_async=True, max_concurrent=10)
| Level | What it Parallelizes | Benefit |
|---|---|---|
| Level 1 (Workers) | Different LLM invocations | Multiple conversations evaluated simultaneously |
| Level 2 (AsyncConfig) | Metrics within one invocation | Bias, Toxicity, Relevancy run in parallel |
This combination provides multiplicative performance gains:
- 4 workers × 4 parallel metrics = up to 16 concurrent LLM-as-a-Judge calls
If your LLM API has rate limits, you can adjust both levels:
# Reduce worker threads
export OTEL_INSTRUMENTATION_GENAI_EVALS_WORKERS=2
# Note: max_concurrent is hardcoded to 10 in our implementation
# For further rate limiting, reduce workers or consider throttle_valueFor more details on DeepEval's async configuration options, see:
Invocation → Queue → Worker → Evaluator.evaluate() → sync LLM call → Results
↑ │
└────────────────────────────────────────────┘
(blocks until complete)
┌──► Worker 1 → asyncio → Evaluator.evaluate_async() ──┐
│ │ │
Invocation → Queue ─┼──► Worker 2 → asyncio → async LLM call ──────────────┼──► Results
│ │ │
└──► Worker N → asyncio → (non-blocking) ──────────────┘
| Variable | Default | Description |
|---|---|---|
OTEL_INSTRUMENTATION_GENAI_EVALS_CONCURRENT |
false |
Enable concurrent mode |
OTEL_INSTRUMENTATION_GENAI_EVALS_WORKERS |
4 |
Number of worker threads |
OTEL_INSTRUMENTATION_GENAI_EVALS_QUEUE_SIZE |
0 |
Queue size (0 = unbounded) |
Development / Low Volume:
# Use defaults (sequential mode)
# No additional configuration neededProduction / High Volume:
export OTEL_INSTRUMENTATION_GENAI_EVALS_CONCURRENT=true
export OTEL_INSTRUMENTATION_GENAI_EVALS_WORKERS=4
export OTEL_INSTRUMENTATION_GENAI_EVALS_QUEUE_SIZE=100Rate-Limited APIs:
export OTEL_INSTRUMENTATION_GENAI_EVALS_CONCURRENT=true
export OTEL_INSTRUMENTATION_GENAI_EVALS_WORKERS=2 # Reduce to avoid rate limits
export OTEL_INSTRUMENTATION_GENAI_EVALS_QUEUE_SIZE=50When OTEL_INSTRUMENTATION_GENAI_EVALS_QUEUE_SIZE is set:
def enqueue(self, invocation: GenAI):
if self._queue.full():
_LOGGER.warning(
"Evaluation queue full (size=%d), dropping invocation",
self._queue.maxsize
)
return # Drop item, don't block
self._queue.put_nowait(invocation)Benefits:
- Prevents unbounded memory growth
- Provides backpressure signal
- Graceful degradation under load
The implementation ensures thread safety through:
- Thread-safe Queue: Python's
queue.Queueis thread-safe - Per-worker Event Loops: Each worker has its own asyncio loop
- No Shared Mutable State: Workers don't share evaluation state
- Atomic Operations: Queue operations are atomic
def shutdown(self):
"""Gracefully shutdown all workers."""
self._shutdown.set() # Signal workers to stop
# Wait for queue to drain
self._queue.join()
# Wait for all workers to complete
for worker in self._workers:
worker.join(timeout=5.0)| Metric | Sequential | Concurrent (4 workers) |
|---|---|---|
| Throughput | 1 eval/time | ~4 evals/time |
| Latency | O(n) | O(n/workers) |
| Memory | Low | Moderate |
| CPU | Single core | Multi-core |
- Default behavior unchanged: Sequential mode is default
- No breaking changes: Existing code works without modification
- Opt-in concurrent mode: Must explicitly enable via environment variable
- Same evaluation results: Quality and accuracy unchanged
Unit tests cover:
-
Environment Variable Parsing (
test_env.py)read_concurrent_flag()read_worker_count()read_queue_size()
-
Manager Behavior (
test_evaluation_manager.py)- Concurrent mode initialization
- Worker count configuration
- Queue size bounds
- Shutdown handling
-
Async Evaluators (
test_evaluators.py)supports_asyncpropertyevaluate_async()delegation- Native async implementations
-
DeepEval Runner (
test_deepeval_runner.py)run_evaluation_async()- AsyncConfig integration
- Error handling
Potential improvements:
- Adaptive Worker Scaling: Adjust workers based on queue depth
- Priority Queue: Process critical evaluations first
- Circuit Breaker: Handle API failures gracefully
- Metrics Export: Worker pool utilization metrics
- Rate Limiting: Built-in rate limiting per worker