-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmanaged_model.py
More file actions
91 lines (74 loc) · 2.87 KB
/
managed_model.py
File metadata and controls
91 lines (74 loc) · 2.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import asyncio
from typing import List
from ldai import log
from ldai.models import AICompletionConfig
from ldai.providers.runner import Runner
from ldai.providers.types import JudgeResult, ManagedResult, RunnerResult
from ldai.tracker import LDAIConfigTracker
class ManagedModel:
"""
LaunchDarkly managed wrapper for AI model invocations.
Holds a Runner. Handles judge evaluation dispatch and tracking
automatically via ``create_tracker()``. Conversation history is
managed by the runner. Obtain an instance via
``LDAIClient.create_model()``.
"""
def __init__(
self,
ai_config: AICompletionConfig,
model_runner: Runner,
):
self._ai_config = ai_config
self._model_runner = model_runner
async def run(self, prompt: str) -> ManagedResult:
"""
Run the model with a prompt string.
Delegates to the runner, then dispatches judge evaluations and
records tracking metrics.
:param prompt: The user prompt to send to the model
:return: ManagedResult containing the model's response, metric summary,
and an optional evaluations task
"""
tracker = self._ai_config.create_tracker()
result: RunnerResult = await tracker.track_metrics_of_async(
lambda r: r.metrics,
lambda: self._model_runner.run(prompt),
)
evaluations_task = await self._track_judge_results(tracker, prompt, result.content)
return ManagedResult(
content=result.content,
metrics=tracker.get_summary(),
raw=result.raw,
parsed=result.parsed,
evaluations=evaluations_task,
)
async def _track_judge_results(
self,
tracker: LDAIConfigTracker,
input_text: str,
output_text: str,
) -> asyncio.Task[List[JudgeResult]]:
evaluator_task = self._ai_config.evaluator.evaluate(input_text, output_text)
async def _run_and_track(eval_task: asyncio.Task) -> List[JudgeResult]:
results = await eval_task
for r in results:
if not r.sampled:
continue
if r.success:
try:
tracker.track_judge_result(r)
except Exception as exc:
log.warning("Judge evaluation failed: %s", exc)
else:
log.warning("Judge evaluation failed: %s", r.error_message)
return results
return asyncio.create_task(_run_and_track(evaluator_task))
def get_model_runner(self) -> Runner:
"""
Return the underlying runner for advanced use.
:return: The Runner instance.
"""
return self._model_runner
def get_config(self) -> AICompletionConfig:
"""Return the AI completion config."""
return self._ai_config