-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathclient.py
More file actions
2489 lines (2231 loc) · 108 KB
/
client.py
File metadata and controls
2489 lines (2231 loc) · 108 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Client for LaunchDarkly AI agent optimization.
Security note — LAUNCHDARKLY_API_KEY scope
-------------------------------------------
When set, the ``LAUNCHDARKLY_API_KEY`` environment variable is used solely to
authenticate discrete LaunchDarkly REST API calls (e.g. fetching optimization
configs, publishing results via ``auto_commit``). It is:
- Never included in any LLM prompt.
- Never forwarded to user-supplied ``handle_agent_call`` or ``handle_judge_call``
callbacks.
- Never accessible to any external service other than the LaunchDarkly REST API.
All LaunchDarkly API calls are isolated requests; they carry no information
about the caller's broader runtime environment beyond the key itself.
"""
import dataclasses
import json
import logging
import os
import random
import time
import uuid
from typing import Any, Dict, List, Literal, Optional, Union
from ldai import AIAgentConfig, AIJudgeConfig, AIJudgeConfigDefault, LDAIClient
from ldai.models import LDMessage, ModelConfig
from ldclient import Context
from ldai_optimizer.dataclasses import (
AIJudgeCallConfig,
GroundTruthOptimizationOptions,
GroundTruthSample,
HandleJudgeCall,
JudgeResult,
OptimizationContext,
OptimizationFromConfigOptions,
OptimizationJudge,
OptimizationJudgeContext,
OptimizationOptions,
OptimizationResponse,
ToolDefinition,
)
from ldai_optimizer.ld_api_client import (
AgentOptimizationConfig,
AgentOptimizationResultPatch,
AgentOptimizationResultPost,
LDApiClient,
)
from ldai_optimizer.prompts import (
_acceptance_criteria_implies_cost_optimization,
_acceptance_criteria_implies_duration_optimization,
build_message_history_text,
build_new_variation_prompt,
build_reasoning_history,
)
from ldai_optimizer.util import (
RedactionFilter,
await_if_needed,
estimate_cost,
extract_json_from_response,
generate_slug,
interpolate_variables,
judge_passed,
restore_variable_placeholders,
validate_variation_response,
)
logger = logging.getLogger(__name__)
logger.addFilter(RedactionFilter())
def _find_model_config(
model_name: str, configs: List[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Find the best matching model config for a given model name.
When multiple configs share the same ``id``, the one marked ``global=True``
is preferred over project-specific configs. Falls back to the first
non-global match if no global entry exists.
:param model_name: The model id to look up.
:param configs: List of model config dicts from the LD API.
:return: Best-matching model config dict, or None if no match.
"""
matching = [mc for mc in configs if mc.get("id") == model_name]
if not matching:
return None
global_match = next((mc for mc in matching if mc.get("global") is True), None)
return global_match if global_match is not None else matching[0]
def _strip_provider_prefix(model: str) -> str:
"""Strip the provider prefix from a model identifier returned by the LD API.
API model keys are formatted as "Provider.model-name" (e.g. "OpenAI.gpt-5",
"Anthropic.claude-opus-4.6"). Only the part after the first period is needed
by the underlying LLM clients. If no period is present the string is returned
unchanged.
:param model: Raw model string from the API.
:return: Model name with provider prefix removed.
"""
return model.split(".", 1)[-1]
def _compute_validation_count(pool_size: int) -> int:
"""Compute how many validation samples to run after a candidate passes in chaos mode.
Scales with the size of the available input/variable pool so that larger
option sets receive proportionally more validation coverage, capped at 5.
The floor of 2 ensures at least a minimal cross-check even for small pools.
:param pool_size: Total number of distinct choices in the sampling pool
(user_input_options count when provided, otherwise variable_choices count).
:return: Number of validation samples to run (between 2 and 5 inclusive).
"""
return min(5, max(2, pool_size // 4))
# Maximum number of attempts for variation generation. Transient empty or
# unparseable responses from the LLM are retried up to this many times before
# the variation step is treated as a failure.
_MAX_VARIATION_RETRIES = 3
# Duration gate: a candidate must be at least this much faster than the baseline
# (history[0].duration_ms) to pass the duration check when acceptance criteria
# imply a latency optimization goal. 0.80 means the candidate must clock in at
# under 80% of the baseline — i.e. at least 20% improvement.
_DURATION_TOLERANCE = 0.80
# Cost gate: a candidate must cost at most this fraction of the baseline
# (history[0].estimated_cost_usd) to pass when acceptance criteria imply a
# cost reduction goal. 0.80 means at least 20% cheaper than the baseline.
_COST_TOLERANCE = 0.80
# Maps SDK status strings to the API status/activity values expected by
# agent_optimization_result records. Defined at module level to avoid
# allocating the dict on every on_status_update invocation.
_OPTIMIZATION_STATUS_MAP: Dict[str, Dict[str, str]] = {
"init": {"status": "RUNNING", "activity": "PENDING"},
"generating": {"status": "RUNNING", "activity": "GENERATING"},
"evaluating": {"status": "RUNNING", "activity": "EVALUATING"},
"generating variation": {"status": "RUNNING", "activity": "GENERATING_VARIATION"},
"validating": {"status": "RUNNING", "activity": "EVALUATING"},
"turn completed": {"status": "RUNNING", "activity": "COMPLETED"},
"success": {"status": "PASSED", "activity": "COMPLETED"},
"failure": {"status": "FAILED", "activity": "COMPLETED"},
}
class OptimizationClient:
_options: OptimizationOptions
_ldClient: LDAIClient
_agent_config: AIAgentConfig
_has_api_key: bool
_api_key: Optional[str]
_agent_key: str
_initial_instructions: str
def __init__(self, ldClient: LDAIClient) -> None:
self._ldClient = ldClient
self._last_run_succeeded: bool = False
self._last_succeeded_context: Optional[OptimizationContext] = None
self._last_optimization_result_id: Optional[str] = None
self._initial_tool_keys: List[str] = []
self._total_token_usage: int = 0
self._model_configs: List[Dict[str, Any]] = []
if os.environ.get("LAUNCHDARKLY_API_KEY"):
self._has_api_key = True
self._api_key = os.environ.get("LAUNCHDARKLY_API_KEY")
else:
self._has_api_key = False
self._api_key = None
logger.warning(
"LAUNCHDARKLY_API_KEY is not set, functionality will be limited"
)
def _initialize_class_members_from_config(
self, agent_config: AIAgentConfig
) -> None:
if not agent_config.instructions:
raise ValueError(
f"Agent '{agent_config.key}' has no instructions configured. "
"Ensure the agent flag has instructions set before running an optimization."
)
self._current_instructions = agent_config.instructions
self._current_parameters: Dict[str, Any] = (
agent_config.model._parameters if agent_config.model else None
) or {}
self._current_model: Optional[str] = (
agent_config.model.name if agent_config.model else None
)
self._history: List[OptimizationContext] = []
def _build_agent_config_for_context(
self, ctx: OptimizationContext, skip_interpolation: bool = False
) -> AIAgentConfig:
"""
Construct an AIAgentConfig that reflects the current optimization iteration.
Uses the instructions, model, and parameters from the given context so the
caller receives the variation being evaluated rather than the original base config.
``{{placeholder}}`` tokens in the instructions are substituted using
ctx.current_variables at call time so the stored template is never mutated.
:param ctx: The OptimizationContext for this iteration
:param skip_interpolation: When True, skip variable interpolation on the
instructions. Use this when the instructions are a meta-prompt (e.g. a
variation-generation prompt) that deliberately contains ``{{key}}`` tokens
as text for the LLM to read rather than as runtime substitution targets.
:return: A fresh AIAgentConfig populated from the context's current state
"""
instructions = (
interpolate_variables(ctx.current_instructions, ctx.current_variables)
if ctx.current_variables and not skip_interpolation
else ctx.current_instructions
)
return AIAgentConfig(
key=self._agent_key,
enabled=True,
create_tracker=self._agent_config.create_tracker,
model=ModelConfig(
name=ctx.current_model or "",
parameters=ctx.current_parameters,
),
instructions=instructions,
provider=self._agent_config.provider,
)
def _create_optimization_context(
self,
iteration: int,
variables: Dict[str, Any],
user_input: Optional[str] = None,
completion_response: str = "",
scores: Optional[Dict[str, JudgeResult]] = None,
) -> OptimizationContext:
"""
Create an OptimizeContext with current state.
:param iteration: Current iteration number
:param variables: Variable set chosen for this iteration
:param user_input: Optional user input for this iteration
:param completion_response: Completion response string
:param scores: Optional dictionary of judge results
:return: A new OptimizeContext instance
"""
flat_history = [prev_ctx.copy_without_history() for prev_ctx in self._history]
return OptimizationContext(
scores=scores or {},
completion_response=completion_response,
current_instructions=self._current_instructions,
current_parameters=self._current_parameters.copy(),
current_variables=variables,
current_model=self._current_model,
user_input=user_input,
history=tuple(flat_history),
iteration=iteration,
)
@property
def _judge_call(self) -> HandleJudgeCall:
"""Return the judge callable, falling back to handle_agent_call when not set."""
return self._options.handle_judge_call or self._options.handle_agent_call
def _safe_status_update(
self,
status: Literal[
"init",
"generating",
"evaluating",
"generating variation",
"validating",
"turn completed",
"success",
"failure",
],
context: OptimizationContext,
iteration: int,
) -> None:
"""
Safely call on_status_update callback, catching and logging errors.
:param status: The status string to pass to the callback
:param context: The optimization context to pass to the callback
:param iteration: Current iteration number for logging
"""
if self._options.on_status_update:
try:
self._options.on_status_update(status, context.copy_without_history())
except Exception:
logger.exception(
"[Iteration %d] -> on_status_update callback failed", iteration
)
def _judge_config(
self,
judge_key: str,
context: Context,
default: AIJudgeConfigDefault,
variables: Dict[str, Any],
) -> AIJudgeConfig:
"""
Fetch a judge configuration from the LaunchDarkly client.
Thin wrapper around LDAIClient.judge_config so callers do not need a
direct reference to the client.
:param judge_key: The key for the judge configuration in LaunchDarkly
:param context: The evaluation context
:param default: Fallback config when the flag is disabled or unreachable
:param variables: Template variables for instruction interpolation
:return: The resolved AIJudgeConfig
"""
return self._ldClient.judge_config(judge_key, context, default, variables)
def _serialize_scores(
self, judge_results: Dict[str, JudgeResult]
) -> Dict[str, Any]:
"""
Convert judge results to a JSON-serializable dictionary.
:param judge_results: Dictionary of judge keys to JudgeResult instances
:return: Dictionary suitable for json.dumps
"""
return {key: result.to_json() for key, result in judge_results.items()}
def _extract_agent_tools(self, parameters: Dict[str, Any]) -> List[ToolDefinition]:
"""
Extract and normalise the tools list from agent parameters.
Reads the ``tools`` key from *parameters* (if present) and converts
every entry to a ToolDefinition so judges receive typed objects.
:param parameters: The agent's current_parameters dict
:return: List of ToolDefinition instances, empty list if no tools are configured
"""
raw_tools = parameters.get("tools", [])
if not raw_tools:
return []
if not isinstance(raw_tools, list):
raw_tools = [raw_tools]
result = []
for tool in raw_tools:
if isinstance(tool, ToolDefinition):
result.append(tool)
elif hasattr(tool, "to_dict"):
result.append(ToolDefinition.from_dict(tool.to_dict()))
elif isinstance(tool, dict):
result.append(ToolDefinition.from_dict(tool))
return result
def _parse_judge_response(
self,
response_str: str,
judge_key: str,
judge_identifier: str,
iteration: int,
clamp_score: bool = True,
) -> JudgeResult:
"""
Parse a structured LLM judge response into a JudgeResult.
Expects a JSON object with "score" (float) and optionally "rationale"
(str). On any parsing failure, logs the exception and returns a zero score.
:param response_str: Raw string response from the judge LLM
:param judge_key: Key used to identify this judge in results dicts
:param judge_identifier: Human-readable identifier for log messages
:param iteration: Current iteration number for logging
:param clamp_score: When True, clamps score to [0.0, 1.0]
:return: Parsed JudgeResult, or a zero-score result on failure
"""
try:
response_data = extract_json_from_response(response_str)
score = float(response_data.get("score", 0.0))
if clamp_score:
score = max(0.0, min(1.0, score))
rationale = response_data.get("rationale")
return JudgeResult(score=score, rationale=rationale)
except Exception:
logger.exception(
"[Iteration %d] -> Failed to parse judge response for %s",
iteration,
judge_identifier,
)
return JudgeResult(score=0.0, rationale=None)
async def _call_judges(
self,
completion_response: str,
iteration: int,
user_input: str,
variables: Optional[Dict[str, Any]] = None,
agent_tools: Optional[List[ToolDefinition]] = None,
expected_response: Optional[str] = None,
agent_duration_ms: Optional[float] = None,
agent_usage: Optional[Any] = None,
) -> Dict[str, JudgeResult]:
"""
Call all judges in parallel (auto-path).
For judges with judge_key: Fetches judge config on-demand from LaunchDarkly SDK.
For judges with acceptance_statement: Uses handle_judge_call callback.
:param completion_response: The agent's completion response to evaluate
:param iteration: Current iteration number
:param user_input: The user's question for this turn, forwarded to judges so
they know what was actually asked (the current turn is not yet in
self._history when judges run)
:param variables: The variable set that was used during the agent generation
:param agent_tools: Normalised list of tool dicts that were available to the agent
:param expected_response: Optional ground truth expected response. When provided,
judges are instructed to factor it into their scoring alongside acceptance criteria.
:param agent_duration_ms: Wall-clock duration of the agent call in milliseconds.
Forwarded to acceptance judges whose statement implies a latency goal so they
can mention the duration change in their rationale.
:param agent_usage: Token usage from the agent call. Forwarded to acceptance judges
whose statement implies a cost goal so they can mention token usage in their rationale.
:return: Dictionary of judge results (score and rationale)
"""
if not self._options.judges:
return {}
resolved_variables: Dict[str, Any] = variables or {}
resolved_agent_tools: List[ToolDefinition] = agent_tools or []
logger.info("[Iteration %d] -> Executing evaluation...", iteration)
reasoning_history = build_reasoning_history(self._history)
judge_results: Dict[str, JudgeResult] = {}
judge_count = len(self._options.judges)
for idx, (judge_key, optimization_judge) in enumerate(
self._options.judges.items(), 1
):
judge_type = (
"config" if optimization_judge.judge_key is not None else "acceptance"
)
logger.info(
"[Iteration %d] -> Running judge %d/%d '%s' (%s)...",
iteration,
idx,
judge_count,
judge_key,
judge_type,
)
try:
if optimization_judge.judge_key is not None:
result = await self._evaluate_config_judge(
judge_key,
optimization_judge,
completion_response,
iteration,
reasoning_history,
user_input=user_input,
variables=resolved_variables,
agent_tools=resolved_agent_tools,
expected_response=expected_response,
)
judge_results[judge_key] = result
else:
result = await self._evaluate_acceptance_judge(
judge_key,
optimization_judge,
completion_response,
iteration,
reasoning_history,
user_input=user_input,
variables=resolved_variables,
agent_tools=resolved_agent_tools,
expected_response=expected_response,
agent_duration_ms=agent_duration_ms,
agent_usage=agent_usage,
)
judge_results[judge_key] = result
threshold = (
optimization_judge.threshold
if optimization_judge.threshold is not None
else 1.0
)
passed = judge_passed(result.score, threshold, optimization_judge.is_inverted)
logger.debug(
"[Iteration %d] -> Judge '%s' scored %.3f (threshold=%.3f, inverted=%s) -> %s%s",
iteration,
judge_key,
result.score,
threshold,
optimization_judge.is_inverted,
"PASSED" if passed else "FAILED",
f" | {result.rationale}" if result.rationale else "",
)
except Exception:
logger.exception(
"[Iteration %d] -> Judge %s evaluation failed", iteration, judge_key
)
judge_results[judge_key] = JudgeResult(score=0.0, rationale=None)
judge_results_json = self._serialize_scores(judge_results)
logger.debug(
"[Iteration %d] -> Evaluation result: %s",
iteration,
json.dumps(judge_results_json, indent=2),
)
return judge_results
async def _evaluate_config_judge(
self,
judge_key: str,
optimization_judge: "OptimizationJudge",
completion_response: str,
iteration: int,
reasoning_history: str,
user_input: str,
variables: Optional[Dict[str, Any]] = None,
agent_tools: Optional[List[ToolDefinition]] = None,
expected_response: Optional[str] = None,
) -> JudgeResult:
"""
Evaluate using a config-type judge (with judge_key).
:param judge_key: The key for this judge in the judges dict
:param optimization_judge: The optimization judge configuration
:param completion_response: The agent's completion response to evaluate
:param iteration: Current iteration number
:param reasoning_history: Formatted string of reasoning from previous iterations
:param user_input: The user's question for this turn
:param variables: The variable set that was used during agent generation
:param agent_tools: Normalised list of tool dicts that were available to the agent
:param expected_response: Optional ground truth expected response. When provided,
injected into template variables and judge messages.
:return: The judge result with score and rationale
"""
# Config-type judge: fetch judge config on-demand from LaunchDarkly SDK
input_text = self._current_instructions or ""
# Combine current instructions, history, and current question for message_history
message_history_text = build_message_history_text(
self._history, input_text, reasoning_history, user_input
)
# Merge agent variables so the judge's LD-managed instructions can reference
# {{variable_name}} tokens alongside the standard judge template variables.
template_variables: Dict[str, Any] = {
**(variables or {}),
"message_history": message_history_text,
"response_to_evaluate": completion_response,
}
if expected_response is not None:
template_variables["expected_response"] = expected_response
assert optimization_judge.judge_key is not None
judge_config = self._judge_config(
optimization_judge.judge_key,
self._options.context_choices[0],
AIJudgeConfigDefault(enabled=False),
template_variables,
)
if not judge_config.enabled:
logger.warning(
"[Iteration %d] -> Judge %s is disabled",
iteration,
optimization_judge.judge_key,
)
return JudgeResult(score=0.0, rationale=None)
if not judge_config.messages:
logger.warning(
"[Iteration %d] -> Judge %s has no messages",
iteration,
optimization_judge.judge_key,
)
return JudgeResult(score=0.0, rationale=None)
# Split messages into system and user turns.
# System turns are joined into a single instructions string (agents SDK path).
# All messages are forwarded as-is for the completions path.
system_parts = []
user_parts = []
for msg in judge_config.messages:
if msg.role == "system":
system_parts.append(
msg.content
+ " Return your response as a JSON object with 'score' and 'rationale' fields."
)
elif msg.role == "user":
user_parts.append(msg.content)
instructions = "\n\n".join(system_parts)
judge_user_input = (
"\n\n".join(user_parts)
if user_parts
else f"Here is the response to evaluate: {completion_response}"
)
if expected_response is not None:
judge_user_input += (
f"\n\nHere is the expected response: {expected_response}"
"\n\nEvaluate the actual response against both the acceptance criteria AND "
"how closely it matches the expected response. Factor both into your score."
)
# Rebuild the message list with the updated system content so completions users
# receive the same scoring instructions that are baked into `instructions`.
updated_messages: List[LDMessage] = [
LDMessage(role="system", content=instructions),
LDMessage(role="user", content=judge_user_input),
]
# Always use the global judge_model; model parameters (temperature, etc.) from
# the judge flag are still forwarded, but the model name is never overridden.
model_name = self._options.judge_model
model_params: Dict[str, Any] = {}
tools: List[ToolDefinition] = []
if judge_config.model and judge_config.model._parameters:
existing_tools = judge_config.model._parameters.get("tools")
if existing_tools:
raw = (
existing_tools
if isinstance(existing_tools, list)
else [existing_tools]
)
for t in raw:
if isinstance(t, ToolDefinition):
tools.append(t)
elif hasattr(t, "to_dict"):
tools.append(ToolDefinition.from_dict(t.to_dict()))
elif isinstance(t, dict):
tools.append(ToolDefinition.from_dict(t))
model_params = {
k: v for k, v in judge_config.model._parameters.items() if k != "tools"
}
# Prepend agent tools so the judge can call them when verifying the response
if agent_tools:
tools = list(agent_tools) + tools
tool_params = {"tools": [t.to_dict() for t in tools]} if tools else {}
judge_call_config = AIJudgeCallConfig(
key=judge_key,
model=ModelConfig(
name=model_name,
parameters={**model_params, **tool_params},
),
instructions=instructions,
messages=updated_messages,
)
judge_ctx = OptimizationJudgeContext(
user_input=judge_user_input,
current_variables=variables or {},
)
_judge_start = time.monotonic()
result = self._judge_call(
judge_key, judge_call_config, judge_ctx, True
)
judge_response: OptimizationResponse = await await_if_needed(result)
judge_duration_ms = (time.monotonic() - _judge_start) * 1000
judge_response_str = judge_response.output
logger.debug(
"[Iteration %d] -> Judge response (%s): %s",
iteration,
judge_key,
judge_response_str,
)
# Parse judge response — expect structured JSON output
judge_identifier = optimization_judge.judge_key or judge_key
judge_result = self._parse_judge_response(
judge_response_str,
judge_key,
judge_identifier,
iteration,
clamp_score=False,
)
return dataclasses.replace(judge_result, duration_ms=judge_duration_ms, usage=judge_response.usage)
async def _evaluate_acceptance_judge(
self,
judge_key: str,
optimization_judge: "OptimizationJudge",
completion_response: str,
iteration: int,
reasoning_history: str,
user_input: str,
variables: Optional[Dict[str, Any]] = None,
agent_tools: Optional[List[ToolDefinition]] = None,
expected_response: Optional[str] = None,
agent_duration_ms: Optional[float] = None,
agent_usage: Optional[Any] = None,
) -> JudgeResult:
"""
Evaluate using an acceptance statement judge.
:param judge_key: The key for this judge in the judges dict
:param optimization_judge: The optimization judge configuration
:param completion_response: The agent's completion response to evaluate
:param iteration: Current iteration number
:param reasoning_history: Formatted string of reasoning from previous iterations
:param user_input: The user's question for this turn
:param variables: The variable set that was used during agent generation
:param agent_tools: Normalised list of tool dicts that were available to the agent
:param expected_response: Optional ground truth expected response. When provided,
injected into instructions and judge message so the judge can score actual vs. expected.
:param agent_duration_ms: Wall-clock duration of the agent call in milliseconds.
When the acceptance statement implies a latency goal, the judge is instructed
to mention the duration change in its rationale.
:param agent_usage: Token usage from the agent call. When the acceptance statement
implies a cost goal, the judge is instructed to mention token usage and cost.
:return: The judge result with score and rationale
"""
if not optimization_judge.acceptance_statement:
logger.error(
"[Iteration %d] -> Judge %s has no acceptance_statement",
iteration,
judge_key,
)
return JudgeResult(score=0.0, rationale=None)
resolved_variables = variables or {}
resolved_agent_tools = agent_tools or []
# Build message history including the current user question
message_history_text = build_message_history_text(
self._history, "", reasoning_history, user_input
)
# Build instructions for the judge
instructions = (
"You are a judge that evaluates the response to the user's question.\n\n"
"Here is the statement that you should evaluate the response against: "
f"'{optimization_judge.acceptance_statement}'\n"
f"Here is the history of all messages between the user and the assistant: {message_history_text}\n"
"You should score the response based on how well it meets the acceptance statement "
"using a score between 0.0 and 1.0.\n"
"A score of 0.0 means it does not match at all, while a score of 1.0 means it matches perfectly.\n"
"A score of 0.3-0.7 means it matches partially, while a score of 0.7-1.0 means it matches well.\n"
"A score of 0.0-0.3 means that it does not match well at all. "
"You can return any value between 0.0 and 1.0.\n"
"You should also provide a rationale for your score.\n"
"Return your response as a JSON object with 'score' and 'rationale' fields.\n\n"
'Example: {"score": 0.8, "rationale": "The response matches the acceptance statement well."}'
)
if (
agent_duration_ms is not None
and _acceptance_criteria_implies_duration_optimization(
{judge_key: optimization_judge}
)
):
baseline_ms = (
self._history[0].duration_ms
if self._history and self._history[0].duration_ms is not None
else None
)
instructions += (
f"\n\nThe acceptance criteria for this judge includes a latency/duration goal. "
f"The agent's response took {agent_duration_ms:.0f}ms to generate. "
)
if baseline_ms is not None:
delta_ms = agent_duration_ms - baseline_ms
direction = "faster" if delta_ms < 0 else "slower"
instructions += (
f"The baseline duration (first iteration) was {baseline_ms:.0f}ms. "
f"This response was {abs(delta_ms):.0f}ms {direction} than the baseline. "
)
instructions += (
"In your rationale, state the duration and any change from baseline. "
"If the latency goal is not yet met, include specific, actionable suggestions "
"for how the agent's instructions or model choice could be changed to reduce "
"response time — for example: switching to a faster model, shortening the "
"system prompt, or removing instructions that cause multi-step reasoning. "
"These suggestions will be used directly to generate the next variation."
)
if _acceptance_criteria_implies_cost_optimization({judge_key: optimization_judge}):
current_cost = estimate_cost(
agent_usage,
_find_model_config(self._current_model or "", self._model_configs),
)
baseline_cost = (
self._history[0].estimated_cost_usd
if self._history and self._history[0].estimated_cost_usd is not None
else None
)
if current_cost is not None:
instructions += (
f"\n\nThe acceptance criteria for this judge includes a cost/token-usage goal. "
)
if agent_usage is not None:
instructions += (
f"The agent's response used {agent_usage.input} input tokens "
f"and {agent_usage.output} output tokens "
f"(estimated cost: ${current_cost:.6f}). "
)
if baseline_cost is not None:
delta = current_cost - baseline_cost
direction = "less" if delta < 0 else "more"
instructions += (
f"The baseline cost (first iteration) was ${baseline_cost:.6f}. "
f"This response cost ${abs(delta):.6f} {direction} than the baseline. "
)
instructions += (
"In your rationale, state the token usage and cost, and any change from baseline. "
"If the cost goal is not yet met, include specific, actionable suggestions "
"for how the agent's instructions or model choice could be changed to reduce "
"cost — for example: switching to a cheaper model, shortening the system prompt "
"to reduce input tokens, removing unnecessary output instructions, or tightening "
"response length constraints. "
"These suggestions will be used directly to generate the next variation."
)
if resolved_variables:
instructions += f"\n\nThe following variables were available to the agent: {json.dumps(resolved_variables)}"
if resolved_agent_tools:
tool_names = [t.name for t in resolved_agent_tools]
instructions += (
"\n\nThe following tools were available to the agent and "
f"may be called by you to verify the response: {json.dumps(tool_names)}."
"\nIf verifying the response requires looking up external information, "
"call the appropriate tool before scoring. "
"You should only call the tools for the most recent response, "
"and should only call the tools if necessary. "
"Assume that previous feedback will have addressed bad tool call results from prior iterations."
)
# Agent tools are passed through so the judge can invoke them for verification
tools: List[ToolDefinition] = list(resolved_agent_tools)
judge_user_input = f"Here is the response to evaluate: {completion_response}"
if expected_response is not None:
judge_user_input += (
f"\n\nHere is the expected response: {expected_response}"
"\n\nEvaluate the actual response against both the acceptance statement AND "
"how closely it matches the expected response. Factor both into your score."
)
tool_params = {"tools": [t.to_dict() for t in tools]} if tools else {}
judge_call_config = AIJudgeCallConfig(
key=judge_key,
model=ModelConfig(
name=self._options.judge_model,
parameters=tool_params,
),
instructions=instructions,
messages=[
LDMessage(role="system", content=instructions),
LDMessage(role="user", content=judge_user_input),
],
)
judge_ctx = OptimizationJudgeContext(
user_input=judge_user_input,
current_variables=resolved_variables,
)
_judge_start = time.monotonic()
result = self._judge_call(
judge_key, judge_call_config, judge_ctx, True
)
judge_response: OptimizationResponse = await await_if_needed(result)
judge_duration_ms = (time.monotonic() - _judge_start) * 1000
judge_response_str = judge_response.output
logger.debug(
"[Iteration %d] -> Judge response (%s): %s",
iteration,
judge_key,
judge_response_str,
)
# Parse judge response — expect structured JSON output with score and rationale
judge_result = self._parse_judge_response(
judge_response_str, judge_key, judge_key, iteration, clamp_score=True
)
return dataclasses.replace(judge_result, duration_ms=judge_duration_ms, usage=judge_response.usage)
async def _get_agent_config(
self, agent_key: str, context: Context
) -> AIAgentConfig:
"""
Fetch the agent configuration, replacing the instructions with the raw variation
template so that {{placeholder}} tokens are preserved for client-side interpolation.
agent_config() is called normally so we get a fully populated AIAgentConfig
(including the tracker). We then call variation() separately to retrieve the
unrendered instruction template and swap it in, keeping everything else intact.
:param agent_key: The key for the agent to get the configuration for
:param context: The evaluation context
:return: AIAgentConfig with raw {{placeholder}} instruction templates intact
"""
try:
agent_config = self._ldClient.agent_config(agent_key, context)
# variation() returns the raw JSON before chevron.render(), so instructions
# still contain {{placeholder}} tokens rather than empty strings.
raw_variation = self._ldClient._client.variation(agent_key, context, {})
raw_instructions = raw_variation.get(
"instructions", agent_config.instructions
)
if not raw_instructions:
raise ValueError(
f"Agent '{agent_key}' has no instructions configured. "
"Ensure the agent flag has instructions set before running an optimization."
)
self._initial_instructions = raw_instructions
raw_tools = raw_variation.get("tools", [])
self._initial_tool_keys = [
t["key"]
for t in raw_tools
if isinstance(t, dict) and "key" in t
]
agent_config = dataclasses.replace(
agent_config, instructions=raw_instructions
)
self._initialize_class_members_from_config(agent_config)
return agent_config
except Exception:
logger.exception("[Optimization] -> Failed to get agent configuration")
raise
async def optimize_from_options(
self, agent_key: str, options: OptimizationOptions
) -> Any:
"""Execute an optimization on the given agent with the given options.
:param agent_key: Identifier of the agent to optimize.
:param options: Optimization options.
:return: Optimization result.
"""
if options.auto_commit:
if not self._has_api_key:
raise ValueError(
"auto_commit requires LAUNCHDARKLY_API_KEY to be set"
)
if not options.project_key:
raise ValueError(
"auto_commit requires project_key to be set on OptimizationOptions"
)
self._agent_key = agent_key
context = random.choice(options.context_choices)
agent_config = await self._get_agent_config(agent_key, context)
result = await self._run_optimization(agent_config, options)
if options.auto_commit and self._last_run_succeeded and self._last_succeeded_context:
self._commit_variation(
self._last_succeeded_context,
project_key=options.project_key, # type: ignore[arg-type]
ai_config_key=agent_key,
output_key=options.output_key,
base_url=options.base_url,
)
return result
async def optimize_from_ground_truth_options(
self, agent_key: str, options: GroundTruthOptimizationOptions
) -> List[OptimizationContext]:
"""Execute a ground truth optimization on the given agent.
Unlike optimize_from_options (which tests random choices until one passes),
this path evaluates all N ground truth samples in each attempt and only
succeeds when every sample passes its judges. A new variation is generated
whenever any sample fails, and all N samples are re-evaluated from scratch
with the updated configuration, up to max_attempts.
:param agent_key: Identifier of the agent to optimize.
:param options: Ground truth optimization options including the ordered sample list.
:return: List of OptimizationContexts from the final attempt (one per sample).
"""
if options.auto_commit:
if not self._has_api_key:
raise ValueError(
"auto_commit requires LAUNCHDARKLY_API_KEY to be set"
)
if not options.project_key:
raise ValueError(
"auto_commit requires project_key to be set on GroundTruthOptimizationOptions"
)
self._agent_key = agent_key
context = random.choice(options.context_choices)
agent_config = await self._get_agent_config(agent_key, context)
result = await self._run_ground_truth_optimization(agent_config, options)
if options.auto_commit and self._last_run_succeeded and self._last_succeeded_context:
self._commit_variation(
self._last_succeeded_context,
project_key=options.project_key, # type: ignore[arg-type]
ai_config_key=agent_key,
output_key=options.output_key,
base_url=options.base_url,
)