Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 243 additions & 40 deletions packages/optimization/src/ldai_optimizer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
build_message_history_text,
build_new_variation_prompt,
build_reasoning_history,
build_token_latency_variation_prompt,
)
from ldai_optimizer.util import (
RedactionFilter,
Expand Down Expand Up @@ -222,6 +223,7 @@ def __init__(self, ldClient: LDAIClient) -> None:
self._total_token_usage: int = 0
self._model_configs: List[Dict[str, Any]] = []
self._last_batch_size: int = 1
self._in_cost_latency_phase: bool = False

if os.environ.get("LAUNCHDARKLY_API_KEY"):
self._has_api_key = True
Expand Down Expand Up @@ -904,7 +906,8 @@ async def _evaluate_acceptance_judge(
)

if (
agent_duration_ms is not None
self._in_cost_latency_phase
and agent_duration_ms is not None
and bool(self._options.latency_optimization)
):
baseline_ms = self._baseline_duration_ms
Expand All @@ -922,13 +925,12 @@ async def _evaluate_acceptance_judge(
instructions += (
"In your rationale, state the duration and any change from baseline. "
"If the latency goal is not yet met, include specific, actionable suggestions "
"for how the agent's instructions or model choice could be changed to reduce "
"response time — for example: switching to a faster model, shortening the "
"system prompt, or removing instructions that cause multi-step reasoning. "
"for how the model choice or parameters could be changed to reduce "
"response time — for example: switching to a faster model or reducing max_tokens. "
"These suggestions will be used directly to generate the next variation."
)

if bool(self._options.token_optimization):
if self._in_cost_latency_phase and bool(self._options.token_optimization):
current_cost = estimate_cost(
agent_usage,
_find_model_config(self._current_model or "", self._model_configs),
Expand All @@ -954,10 +956,8 @@ async def _evaluate_acceptance_judge(
instructions += (
"In your rationale, state the token usage and cost, and any change from baseline. "
"If the cost goal is not yet met, include specific, actionable suggestions "
"for how the agent's instructions or model choice could be changed to reduce "
"cost — for example: switching to a cheaper model, shortening the system prompt "
"to reduce input tokens, removing unnecessary output instructions, or tightening "
"response length constraints. "
"for how the model choice or parameters could be changed to reduce "
"cost — for example: switching to a cheaper model or reducing max_tokens. "
"These suggestions will be used directly to generate the next variation."
)

Expand Down Expand Up @@ -1366,13 +1366,6 @@ async def _run_ground_truth_optimization(
else:
sample_passed = self._evaluate_response(optimize_context)

sample_passed, optimize_context = self._apply_duration_gate(sample_passed, optimize_context)
sample_passed, optimize_context = self._apply_cost_gate(sample_passed, optimize_context)

# Flush gate scores to the API for this sample. Without this,
# the next sample's "generating" event closes out this record
# with a status-only PATCH before gate scores are sent, so only
# the last sample would ever show latency/cost gate entries.
self._safe_status_update("evaluating", optimize_context, linear_iter)

if not sample_passed:
Expand Down Expand Up @@ -1462,6 +1455,25 @@ async def _run_ground_truth_optimization(
logger.exception(
"[GT Attempt %d] -> on_passing_result callback failed", attempt
)
# Phase 2: optimize model/params on the frozen winning variation.
if (
self._options.latency_optimization
or self._options.token_optimization
) and not self._is_token_limit_exceeded():
phase1_winner = self._last_succeeded_context
await self._run_cost_latency_phase(
last_ctx,
last_ctx.iteration,
)
if self._last_succeeded_context is None:
# No Phase 2 candidate won; restore the Phase 1 winner.
self._last_run_succeeded = True
self._last_succeeded_context = phase1_winner
Comment thread
cursor[bot] marked this conversation as resolved.
elif self._last_succeeded_context is not phase1_winner:
# Phase 2 selected a better model; return that context so
# callers (including auto_commit) see the actual final winner
# rather than the stale Phase 1 GT batch results.
return [self._last_succeeded_context]
return attempt_results

# We've hit max attempts for the batches, bail at this point
Expand Down Expand Up @@ -1561,7 +1573,18 @@ def _apply_new_variation_response(
f"Received fields: {list(response_data.keys())}"
)

self._current_instructions = response_data["current_instructions"]
new_instructions = response_data["current_instructions"]

if self._in_cost_latency_phase:
if new_instructions != self._current_instructions:
logger.warning(
"[Iteration %d] -> Phase 2 (cost/latency): LLM attempted to change instructions; "
"restoring frozen winning variation instructions to enforce content lock.",
iteration,
)
new_instructions = self._current_instructions

self._current_instructions = new_instructions

# Post-process: replace any leaked variable values back to {{key}} form.
# This is a deterministic safety net for when the LLM ignores the prompt
Expand Down Expand Up @@ -1690,22 +1713,24 @@ async def _generate_new_variation(
)
self._safe_status_update("generating variation", status_ctx, iteration)

optimize_for_duration = bool(self._options.latency_optimization)
optimize_for_cost = bool(self._options.token_optimization)
quality_already_passing = self._all_judges_passing()
instructions = build_new_variation_prompt(
self._history,
self._options.judges,
self._current_model,
self._current_instructions,
self._current_parameters,
self._options.model_choices,
self._options.variable_choices,
self._initial_instructions,
optimize_for_duration=optimize_for_duration,
optimize_for_cost=optimize_for_cost,
quality_already_passing=quality_already_passing,
)
if self._in_cost_latency_phase:
instructions = build_token_latency_variation_prompt(
self._history,
self._options.model_choices,
optimize_for_latency=bool(self._options.latency_optimization),
optimize_for_cost=bool(self._options.token_optimization),
)
else:
instructions = build_new_variation_prompt(
self._history,
self._options.judges,
self._current_model,
self._current_instructions,
self._current_parameters,
self._options.model_choices,
self._options.variable_choices,
self._initial_instructions,
)

# Create a flat history list (without nested history) to avoid exponential growth
flat_history = [prev_ctx.copy_without_history() for prev_ctx in self._history]
Expand Down Expand Up @@ -2523,6 +2548,178 @@ def _handle_failure(
)
return optimize_context

def _pick_best_candidate(
self, candidates: List[OptimizationContext]
) -> OptimizationContext:
"""Select the best Phase 2 candidate by normalized combined cost/latency score.

Ranks all candidates using the sum of their normalized metrics:
score = (duration_ms / baseline_duration_ms) + (estimated_cost_usd / baseline_cost_usd)

Terms whose baseline or measurement is unavailable are omitted from the sum.
The candidate with the lowest score wins. If scores are equal, the first
candidate (earliest iteration) is returned.

:param candidates: Non-empty list of passing Phase 2 OptimizationContexts.
:return: The best-scoring candidate.
"""
def _score(ctx: OptimizationContext) -> float:
total = 0.0
if (
ctx.duration_ms is not None
and self._baseline_duration_ms is not None
and self._baseline_duration_ms > 0
):
total += ctx.duration_ms / self._baseline_duration_ms
if (
ctx.estimated_cost_usd is not None
and self._baseline_cost_usd is not None
and self._baseline_cost_usd > 0
):
total += ctx.estimated_cost_usd / self._baseline_cost_usd
return total
Comment thread
cursor[bot] marked this conversation as resolved.

return min(candidates, key=_score)

async def _run_cost_latency_phase(
self,
winning_ctx: OptimizationContext,
last_iteration: int,
) -> None:
"""Run Phase 2: optimize model and parameters for cost/latency on the frozen winning variation.

The agent's content (instructions, tools) is frozen from the Phase 1 winner.
Only model and parameters may be adjusted. One turn is executed per model
choice (or at least two turns total), collecting passing candidates, then
the best is selected by combined normalized cost/latency score.

Phase 2 always uses a single turn per iteration regardless of whether the
Phase 1 run was in GT mode — the winning user_input and variables from the
Phase 1 winner are reused for every Phase 2 turn.

:param winning_ctx: The Phase 1 winning OptimizationContext.
:param last_iteration: The last iteration number from Phase 1; Phase 2
continues from last_iteration + 1.
"""
self._in_cost_latency_phase = True
self._history = [winning_ctx]
self._current_instructions = winning_ctx.current_instructions
self._current_parameters = winning_ctx.current_parameters.copy()
self._current_model = winning_ctx.current_model

frozen_variables = winning_ctx.current_variables
frozen_user_input = winning_ctx.user_input

# Build a deterministic, deduplicated list of models to evaluate:
# start with the Phase 1 winner's model, then add each model_choice
# that hasn't been seen yet. This guarantees every user-selected model
# is tried exactly once, in a predictable order.
phase1_model = winning_ctx.current_model or ""
seen_models: set = {phase1_model}
ordered_models: List[str] = [phase1_model]
for m in self._options.model_choices or []:
if m not in seen_models:
seen_models.add(m)
ordered_models.append(m)
# Ensure at least 2 iterations
while len(ordered_models) < 2:
ordered_models.append(ordered_models[-1])

candidates: List[OptimizationContext] = []
non_candidates: List[OptimizationContext] = []
max_iters = len(ordered_models)
iteration = last_iteration

for i in range(max_iters):
iteration += 1

# Cycle to the next scheduled model. Instructions and parameters
# are always reset to the Phase 1 winner's frozen values so only
# the model varies — Phase 2 verifies the winning content still
# passes under each candidate model.
self._current_model = ordered_models[i]
self._current_parameters = winning_ctx.current_parameters.copy()
logger.info(
"[Phase 2 Iter %d] -> Evaluating model '%s' (%d/%d)",
iteration,
self._current_model,
i + 1,
max_iters,
)

ctx = self._create_optimization_context(
iteration=iteration,
variables=frozen_variables,
user_input=frozen_user_input,
)
self._safe_status_update("generating", ctx, iteration)
try:
ctx = await self._execute_agent_turn(ctx, iteration)
except Exception:
logger.warning(
"[Phase 2 Iter %d] -> Agent call failed (model=%s); "
"skipping this model and trying the next",
iteration,
self._current_model,
)
non_candidates.append(ctx)
continue
Comment thread
cursor[bot] marked this conversation as resolved.
self._accumulate_tokens(ctx)
ctx = dataclasses.replace(
ctx, accumulated_token_usage=self._total_token_usage
)

quality_passed = self._evaluate_response(ctx)
quality_passed, ctx = self._apply_duration_gate(quality_passed, ctx)
quality_passed, ctx = self._apply_cost_gate(quality_passed, ctx)
Comment thread
andrewklatzke marked this conversation as resolved.

if self._is_token_limit_exceeded():
logger.warning(
"[Phase 2 Iter %d] -> Token limit exceeded; stopping Phase 2 early",
iteration,
)
non_candidates.append(ctx)
break

if quality_passed:
logger.info(
"[Phase 2 Iter %d] -> Passed quality + gates — added to candidates",
iteration,
)
candidates.append(ctx)
else:
logger.info(
"[Phase 2 Iter %d] -> Failed quality or gates", iteration
)
non_candidates.append(ctx)

if i < max_iters - 1:
self._safe_status_update("turn completed", ctx, iteration)

# Send terminal FAILED status for each non-winning model attempt.
# We use _safe_status_update directly rather than _handle_failure so that
# exploratory Phase 2 misses don't corrupt _last_run_succeeded,
# _last_succeeded_context, or trigger on_failing_result — those are
# run-level signals that should only fire if the whole optimization fails.
for failed_ctx in non_candidates:
self._safe_status_update("failure", failed_ctx, failed_ctx.iteration)

if candidates:
best = self._pick_best_candidate(candidates)
self._handle_success(best, best.iteration)
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
logger.info(
"[Phase 2] -> Best candidate selected: model=%s, duration_ms=%s, cost=%s",
best.current_model,
f"{best.duration_ms:.0f}ms" if best.duration_ms is not None else "N/A",
f"${best.estimated_cost_usd:.6f}" if best.estimated_cost_usd is not None else "N/A",
)
else:
logger.info(
"[Phase 2] -> No candidates passed; keeping Phase 1 winner as final result"
)

self._in_cost_latency_phase = False

def _commit_variation(
self,
optimize_context: OptimizationContext,
Expand Down Expand Up @@ -2595,6 +2792,8 @@ def _commit_variation(
"instructions": optimize_context.current_instructions,
"modelConfigKey": model_config_key,
}
if optimize_context.current_parameters:
payload["parameters"] = optimize_context.current_parameters
if self._initial_tool_keys:
payload["toolKeys"] = list(self._initial_tool_keys)
if self._initial_model_custom:
Expand Down Expand Up @@ -2741,9 +2940,6 @@ async def _run_validation_phase(
else:
sample_passed = self._evaluate_response(val_ctx)

sample_passed, val_ctx = self._apply_duration_gate(sample_passed, val_ctx)
sample_passed, val_ctx = self._apply_cost_gate(sample_passed, val_ctx)

if self._is_token_limit_exceeded():
logger.error(
"[Validation %d/%d] -> Token limit exceeded (total=%d)",
Expand Down Expand Up @@ -2869,9 +3065,6 @@ async def _run_optimization(
iteration,
)

initial_passed, optimize_context = self._apply_duration_gate(initial_passed, optimize_context)
initial_passed, optimize_context = self._apply_cost_gate(initial_passed, optimize_context)

# Token limit check after pass/fail evaluation so the persisted record
# correctly reflects whether the iteration passed before stopping the run.
if self._is_token_limit_exceeded():
Expand All @@ -2887,7 +3080,17 @@ async def _run_optimization(
optimize_context, iteration
)
if all_valid:
return self._handle_success(optimize_context, iteration)
self._handle_success(optimize_context, iteration)
phase1_winner = self._last_succeeded_context
if (
self._options.latency_optimization
or self._options.token_optimization
) and not self._is_token_limit_exceeded():
await self._run_cost_latency_phase(optimize_context, iteration)
Comment thread
cursor[bot] marked this conversation as resolved.
if self._last_succeeded_context is None:
self._last_run_succeeded = True
self._last_succeeded_context = phase1_winner
return self._last_succeeded_context
if self._is_token_limit_exceeded():
return self._handle_failure(last_ctx, iteration)
# Validation failed — treat as a normal failed attempt.
Expand Down
1 change: 1 addition & 0 deletions packages/optimization/src/ldai_optimizer/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ class OptimizationJudgeContext:
"turn completed",
"success",
"failure",
"optimizing cost/latency",
]


Expand Down
Loading
Loading