Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions packages/sdk/server-ai/src/ldai/managed_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,16 @@ def _track_judge_results(
input_text: str,
output_text: str,
) -> asyncio.Task[List[JudgeResult]]:
eval_task = self._ai_config.evaluator.evaluate(input_text, output_text)

def _on_done(task: asyncio.Task) -> None:
if task.cancelled():
return
if task.exception() is not None:
return
for r in task.result():
evaluator_task = self._ai_config.evaluator.evaluate(input_text, output_text)

async def _run_and_track(eval_task: asyncio.Task) -> List[JudgeResult]:
results = await eval_task
for r in results:
if r.success:
tracker.track_judge_result(r)
return results
Comment thread
cursor[bot] marked this conversation as resolved.

eval_task.add_done_callback(_on_done)
return eval_task
return asyncio.create_task(_run_and_track(evaluator_task))

def get_messages(self, include_config_messages: bool = False) -> List[LDMessage]:
"""
Expand Down
237 changes: 237 additions & 0 deletions packages/sdk/server-ai/tests/test_managed_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
"""Tests for ManagedModel — specifically the evaluations tracking chain."""

import asyncio
from typing import List
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from ldai.evaluator import Evaluator
from ldai.managed_model import ManagedModel
from ldai.models import AICompletionConfig, LDMessage, ModelConfig, ProviderConfig
from ldai.providers.types import JudgeResult, LDAIMetrics, ModelResponse
from ldai.tracker import LDAIConfigTracker


def _make_ai_completion_config(evaluator: Evaluator) -> AICompletionConfig:
"""Build a minimal AICompletionConfig wired to the given evaluator."""
return AICompletionConfig(
key='test-config',
enabled=True,
create_tracker=MagicMock(return_value=MagicMock(spec=LDAIConfigTracker)),
model=ModelConfig('gpt-4'),
provider=ProviderConfig('openai'),
messages=[LDMessage(role='system', content='You are helpful.')],
evaluator=evaluator,
)


def _make_model_response(content: str = 'response text') -> ModelResponse:
return ModelResponse(
message=LDMessage(role='assistant', content=content),
metrics=LDAIMetrics(success=True, usage=None),
)


class TestManagedModelInvokeReturnsImmediately:
"""invoke() must return before the evaluations task resolves."""

@pytest.mark.asyncio
async def test_invoke_returns_before_evaluations_resolve(self):
"""invoke() should return a ModelResponse before evaluations complete."""
# Set up a barrier so the evaluation coroutine doesn't complete until we release it
barrier = asyncio.Event()

async def _slow_evaluate(input_text: str, output_text: str) -> List[JudgeResult]:
await barrier.wait()
return []

evaluator = MagicMock(spec=Evaluator)
evaluator.evaluate = MagicMock(
side_effect=lambda i, o: asyncio.create_task(_slow_evaluate(i, o))
)

mock_runner = MagicMock()
mock_runner.invoke_model = AsyncMock(return_value=_make_model_response())

config = _make_ai_completion_config(evaluator)
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(return_value=_make_model_response())
config = AICompletionConfig(
key='test-config',
enabled=True,
create_tracker=MagicMock(return_value=mock_tracker),
model=ModelConfig('gpt-4'),
provider=ProviderConfig('openai'),
messages=[],
evaluator=evaluator,
)

model = ManagedModel(config, mock_runner)
response = await model.invoke('Hello')

# invoke() returned — evaluations task should still be pending
assert response is not None
assert response.evaluations is not None
assert not response.evaluations.done(), "evaluations task should still be pending"

# Release the barrier and let it finish cleanly
barrier.set()
await response.evaluations

@pytest.mark.asyncio
async def test_await_evaluations_collects_results(self):
"""await response.evaluations should return the list of JudgeResult instances."""
judge_result = JudgeResult(
judge_config_key='judge-key',
success=True,
sampled=True,
metric_key='$ld:ai:judge:relevance',
score=0.9,
reasoning='Good response',
)

async def _evaluate_coro(input_text: str, output_text: str) -> List[JudgeResult]:
return [judge_result]

evaluator = MagicMock(spec=Evaluator)
evaluator.evaluate = MagicMock(
side_effect=lambda i, o: asyncio.create_task(_evaluate_coro(i, o))
)

mock_runner = MagicMock()
mock_runner.invoke_model = AsyncMock(return_value=_make_model_response())

mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(return_value=_make_model_response())
config = AICompletionConfig(
key='test-config',
enabled=True,
create_tracker=MagicMock(return_value=mock_tracker),
model=ModelConfig('gpt-4'),
provider=ProviderConfig('openai'),
messages=[],
evaluator=evaluator,
)

model = ManagedModel(config, mock_runner)
response = await model.invoke('Hello')

results = await response.evaluations # type: ignore[misc]
assert results == [judge_result]

@pytest.mark.asyncio
async def test_tracking_fires_inside_awaited_chain(self):
"""tracker.track_judge_result() must be called when evaluations are awaited."""
judge_result = JudgeResult(
judge_config_key='judge-key',
success=True,
sampled=True,
metric_key='$ld:ai:judge:relevance',
score=0.85,
reasoning='Relevant answer',
)

async def _evaluate_coro(input_text: str, output_text: str) -> List[JudgeResult]:
return [judge_result]

evaluator = MagicMock(spec=Evaluator)
evaluator.evaluate = MagicMock(
side_effect=lambda i, o: asyncio.create_task(_evaluate_coro(i, o))
)

mock_runner = MagicMock()
mock_runner.invoke_model = AsyncMock(return_value=_make_model_response())

mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(return_value=_make_model_response())
mock_tracker.track_judge_result = MagicMock()

config = AICompletionConfig(
key='test-config',
enabled=True,
create_tracker=MagicMock(return_value=mock_tracker),
model=ModelConfig('gpt-4'),
provider=ProviderConfig('openai'),
messages=[],
evaluator=evaluator,
)

model = ManagedModel(config, mock_runner)
response = await model.invoke('Hello')

# Tracking should NOT have fired yet (before we await evaluations)
mock_tracker.track_judge_result.assert_not_called()

# Now await the evaluations task — tracking fires inside the chain
await response.evaluations # type: ignore[misc]

mock_tracker.track_judge_result.assert_called_once_with(judge_result)

@pytest.mark.asyncio
async def test_tracking_not_called_for_failed_judge_result(self):
"""tracker.track_judge_result() should NOT be called for unsuccessful judge results."""
failed_result = JudgeResult(
success=False,
sampled=True,
metric_key='$ld:ai:judge:relevance',
error_message='Judge evaluation failed',
)

async def _evaluate_coro(input_text: str, output_text: str) -> List[JudgeResult]:
return [failed_result]

evaluator = MagicMock(spec=Evaluator)
evaluator.evaluate = MagicMock(
side_effect=lambda i, o: asyncio.create_task(_evaluate_coro(i, o))
)

mock_runner = MagicMock()
mock_runner.invoke_model = AsyncMock(return_value=_make_model_response())

mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(return_value=_make_model_response())
mock_tracker.track_judge_result = MagicMock()

config = AICompletionConfig(
key='test-config',
enabled=True,
create_tracker=MagicMock(return_value=mock_tracker),
model=ModelConfig('gpt-4'),
provider=ProviderConfig('openai'),
messages=[],
evaluator=evaluator,
)

model = ManagedModel(config, mock_runner)
response = await model.invoke('Hello')
await response.evaluations # type: ignore[misc]

mock_tracker.track_judge_result.assert_not_called()

@pytest.mark.asyncio
async def test_noop_evaluator_returns_empty_list(self):
"""With a noop evaluator, awaiting evaluations should return an empty list."""
evaluator = Evaluator.noop()

mock_runner = MagicMock()
mock_runner.invoke_model = AsyncMock(return_value=_make_model_response())

mock_tracker = MagicMock(spec=LDAIConfigTracker)
mock_tracker.track_metrics_of_async = AsyncMock(return_value=_make_model_response())

config = AICompletionConfig(
key='test-config',
enabled=True,
create_tracker=MagicMock(return_value=mock_tracker),
model=ModelConfig('gpt-4'),
provider=ProviderConfig('openai'),
messages=[],
evaluator=evaluator,
)

model = ManagedModel(config, mock_runner)
response = await model.invoke('Hello')
results = await response.evaluations # type: ignore[misc]

assert results == []
Loading