Skip to content

Commit af4ec5d

Browse files
committed
changes cost & latency optimization to post-process
1 parent 509240f commit af4ec5d

4 files changed

Lines changed: 409 additions & 127 deletions

File tree

packages/optimization/src/ldai_optimizer/client.py

Lines changed: 234 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
build_message_history_text,
5454
build_new_variation_prompt,
5555
build_reasoning_history,
56+
build_token_latency_variation_prompt,
5657
)
5758
from ldai_optimizer.util import (
5859
RedactionFilter,
@@ -222,6 +223,7 @@ def __init__(self, ldClient: LDAIClient) -> None:
222223
self._total_token_usage: int = 0
223224
self._model_configs: List[Dict[str, Any]] = []
224225
self._last_batch_size: int = 1
226+
self._in_cost_latency_phase: bool = False
225227

226228
if os.environ.get("LAUNCHDARKLY_API_KEY"):
227229
self._has_api_key = True
@@ -904,7 +906,8 @@ async def _evaluate_acceptance_judge(
904906
)
905907

906908
if (
907-
agent_duration_ms is not None
909+
self._in_cost_latency_phase
910+
and agent_duration_ms is not None
908911
and bool(self._options.latency_optimization)
909912
):
910913
baseline_ms = self._baseline_duration_ms
@@ -922,13 +925,12 @@ async def _evaluate_acceptance_judge(
922925
instructions += (
923926
"In your rationale, state the duration and any change from baseline. "
924927
"If the latency goal is not yet met, include specific, actionable suggestions "
925-
"for how the agent's instructions or model choice could be changed to reduce "
926-
"response time — for example: switching to a faster model, shortening the "
927-
"system prompt, or removing instructions that cause multi-step reasoning. "
928+
"for how the model choice or parameters could be changed to reduce "
929+
"response time — for example: switching to a faster model or reducing max_tokens. "
928930
"These suggestions will be used directly to generate the next variation."
929931
)
930932

931-
if bool(self._options.token_optimization):
933+
if self._in_cost_latency_phase and bool(self._options.token_optimization):
932934
current_cost = estimate_cost(
933935
agent_usage,
934936
_find_model_config(self._current_model or "", self._model_configs),
@@ -954,10 +956,8 @@ async def _evaluate_acceptance_judge(
954956
instructions += (
955957
"In your rationale, state the token usage and cost, and any change from baseline. "
956958
"If the cost goal is not yet met, include specific, actionable suggestions "
957-
"for how the agent's instructions or model choice could be changed to reduce "
958-
"cost — for example: switching to a cheaper model, shortening the system prompt "
959-
"to reduce input tokens, removing unnecessary output instructions, or tightening "
960-
"response length constraints. "
959+
"for how the model choice or parameters could be changed to reduce "
960+
"cost — for example: switching to a cheaper model or reducing max_tokens. "
961961
"These suggestions will be used directly to generate the next variation."
962962
)
963963

@@ -1366,13 +1366,6 @@ async def _run_ground_truth_optimization(
13661366
else:
13671367
sample_passed = self._evaluate_response(optimize_context)
13681368

1369-
sample_passed, optimize_context = self._apply_duration_gate(sample_passed, optimize_context)
1370-
sample_passed, optimize_context = self._apply_cost_gate(sample_passed, optimize_context)
1371-
1372-
# Flush gate scores to the API for this sample. Without this,
1373-
# the next sample's "generating" event closes out this record
1374-
# with a status-only PATCH before gate scores are sent, so only
1375-
# the last sample would ever show latency/cost gate entries.
13761369
self._safe_status_update("evaluating", optimize_context, linear_iter)
13771370

13781371
if not sample_passed:
@@ -1462,6 +1455,19 @@ async def _run_ground_truth_optimization(
14621455
logger.exception(
14631456
"[GT Attempt %d] -> on_passing_result callback failed", attempt
14641457
)
1458+
# Phase 2: optimize model/params on the frozen winning variation.
1459+
if (
1460+
self._options.latency_optimization
1461+
or self._options.token_optimization
1462+
) and not self._is_token_limit_exceeded():
1463+
phase1_winner = self._last_succeeded_context
1464+
await self._run_cost_latency_phase(
1465+
last_ctx,
1466+
last_ctx.iteration,
1467+
)
1468+
if self._last_succeeded_context is None:
1469+
self._last_run_succeeded = True
1470+
self._last_succeeded_context = phase1_winner
14651471
return attempt_results
14661472

14671473
# We've hit max attempts for the batches, bail at this point
@@ -1561,7 +1567,18 @@ def _apply_new_variation_response(
15611567
f"Received fields: {list(response_data.keys())}"
15621568
)
15631569

1564-
self._current_instructions = response_data["current_instructions"]
1570+
new_instructions = response_data["current_instructions"]
1571+
1572+
if self._in_cost_latency_phase:
1573+
if new_instructions != self._current_instructions:
1574+
logger.warning(
1575+
"[Iteration %d] -> Phase 2 (cost/latency): LLM attempted to change instructions; "
1576+
"restoring frozen winning variation instructions to enforce content lock.",
1577+
iteration,
1578+
)
1579+
new_instructions = self._current_instructions
1580+
1581+
self._current_instructions = new_instructions
15651582

15661583
# Post-process: replace any leaked variable values back to {{key}} form.
15671584
# This is a deterministic safety net for when the LLM ignores the prompt
@@ -1690,22 +1707,24 @@ async def _generate_new_variation(
16901707
)
16911708
self._safe_status_update("generating variation", status_ctx, iteration)
16921709

1693-
optimize_for_duration = bool(self._options.latency_optimization)
1694-
optimize_for_cost = bool(self._options.token_optimization)
1695-
quality_already_passing = self._all_judges_passing()
1696-
instructions = build_new_variation_prompt(
1697-
self._history,
1698-
self._options.judges,
1699-
self._current_model,
1700-
self._current_instructions,
1701-
self._current_parameters,
1702-
self._options.model_choices,
1703-
self._options.variable_choices,
1704-
self._initial_instructions,
1705-
optimize_for_duration=optimize_for_duration,
1706-
optimize_for_cost=optimize_for_cost,
1707-
quality_already_passing=quality_already_passing,
1708-
)
1710+
if self._in_cost_latency_phase:
1711+
instructions = build_token_latency_variation_prompt(
1712+
self._history,
1713+
self._options.model_choices,
1714+
optimize_for_latency=bool(self._options.latency_optimization),
1715+
optimize_for_cost=bool(self._options.token_optimization),
1716+
)
1717+
else:
1718+
instructions = build_new_variation_prompt(
1719+
self._history,
1720+
self._options.judges,
1721+
self._current_model,
1722+
self._current_instructions,
1723+
self._current_parameters,
1724+
self._options.model_choices,
1725+
self._options.variable_choices,
1726+
self._initial_instructions,
1727+
)
17091728

17101729
# Create a flat history list (without nested history) to avoid exponential growth
17111730
flat_history = [prev_ctx.copy_without_history() for prev_ctx in self._history]
@@ -2523,6 +2542,175 @@ def _handle_failure(
25232542
)
25242543
return optimize_context
25252544

2545+
def _pick_best_candidate(
2546+
self, candidates: List[OptimizationContext]
2547+
) -> OptimizationContext:
2548+
"""Select the best Phase 2 candidate by normalized combined cost/latency score.
2549+
2550+
Ranks all candidates using the sum of their normalized metrics:
2551+
score = (duration_ms / baseline_duration_ms) + (estimated_cost_usd / baseline_cost_usd)
2552+
2553+
Terms whose baseline or measurement is unavailable are omitted from the sum.
2554+
The candidate with the lowest score wins. If scores are equal, the first
2555+
candidate (earliest iteration) is returned.
2556+
2557+
:param candidates: Non-empty list of passing Phase 2 OptimizationContexts.
2558+
:return: The best-scoring candidate.
2559+
"""
2560+
def _score(ctx: OptimizationContext) -> float:
2561+
total = 0.0
2562+
if (
2563+
ctx.duration_ms is not None
2564+
and self._baseline_duration_ms is not None
2565+
and self._baseline_duration_ms > 0
2566+
):
2567+
total += ctx.duration_ms / self._baseline_duration_ms
2568+
if (
2569+
ctx.estimated_cost_usd is not None
2570+
and self._baseline_cost_usd is not None
2571+
and self._baseline_cost_usd > 0
2572+
):
2573+
total += ctx.estimated_cost_usd / self._baseline_cost_usd
2574+
return total
2575+
2576+
return min(candidates, key=_score)
2577+
2578+
async def _run_cost_latency_phase(
2579+
self,
2580+
winning_ctx: OptimizationContext,
2581+
last_iteration: int,
2582+
) -> None:
2583+
"""Run Phase 2: optimize model and parameters for cost/latency on the frozen winning variation.
2584+
2585+
The agent's content (instructions, tools) is frozen from the Phase 1 winner.
2586+
Only model and parameters may be adjusted. One turn is executed per model
2587+
choice (or at least two turns total), collecting passing candidates, then
2588+
the best is selected by combined normalized cost/latency score.
2589+
2590+
Phase 2 always uses a single turn per iteration regardless of whether the
2591+
Phase 1 run was in GT mode — the winning user_input and variables from the
2592+
Phase 1 winner are reused for every Phase 2 turn.
2593+
2594+
:param winning_ctx: The Phase 1 winning OptimizationContext.
2595+
:param last_iteration: The last iteration number from Phase 1; Phase 2
2596+
continues from last_iteration + 1.
2597+
"""
2598+
self._in_cost_latency_phase = True
2599+
self._history = [winning_ctx]
2600+
self._current_instructions = winning_ctx.current_instructions
2601+
self._current_parameters = winning_ctx.current_parameters.copy()
2602+
self._current_model = winning_ctx.current_model
2603+
2604+
frozen_variables = winning_ctx.current_variables
2605+
frozen_user_input = winning_ctx.user_input
2606+
2607+
# Build a deterministic, deduplicated list of models to evaluate:
2608+
# start with the Phase 1 winner's model, then add each model_choice
2609+
# that hasn't been seen yet. This guarantees every user-selected model
2610+
# is tried exactly once, in a predictable order.
2611+
phase1_model = winning_ctx.current_model or ""
2612+
seen_models: set = {phase1_model}
2613+
ordered_models: List[str] = [phase1_model]
2614+
for m in self._options.model_choices or []:
2615+
if m not in seen_models:
2616+
seen_models.add(m)
2617+
ordered_models.append(m)
2618+
# Ensure at least 2 iterations
2619+
while len(ordered_models) < 2:
2620+
ordered_models.append(ordered_models[-1])
2621+
2622+
candidates: List[OptimizationContext] = []
2623+
non_candidates: List[OptimizationContext] = []
2624+
max_iters = len(ordered_models)
2625+
iteration = last_iteration
2626+
2627+
for i in range(max_iters):
2628+
iteration += 1
2629+
2630+
# Cycle to the next scheduled model. Instructions and parameters
2631+
# are always reset to the Phase 1 winner's frozen values so only
2632+
# the model varies — Phase 2 verifies the winning content still
2633+
# passes under each candidate model.
2634+
self._current_model = ordered_models[i]
2635+
self._current_parameters = winning_ctx.current_parameters.copy()
2636+
logger.info(
2637+
"[Phase 2 Iter %d] -> Evaluating model '%s' (%d/%d)",
2638+
iteration,
2639+
self._current_model,
2640+
i + 1,
2641+
max_iters,
2642+
)
2643+
2644+
ctx = self._create_optimization_context(
2645+
iteration=iteration,
2646+
variables=frozen_variables,
2647+
user_input=frozen_user_input,
2648+
)
2649+
self._safe_status_update("generating", ctx, iteration)
2650+
try:
2651+
ctx = await self._execute_agent_turn(ctx, iteration)
2652+
except Exception:
2653+
logger.warning(
2654+
"[Phase 2 Iter %d] -> Agent call failed (model=%s); "
2655+
"skipping this model and trying the next",
2656+
iteration,
2657+
self._current_model,
2658+
)
2659+
non_candidates.append(ctx)
2660+
continue
2661+
self._accumulate_tokens(ctx)
2662+
ctx = dataclasses.replace(
2663+
ctx, accumulated_token_usage=self._total_token_usage
2664+
)
2665+
2666+
quality_passed = self._evaluate_response(ctx)
2667+
quality_passed, ctx = self._apply_duration_gate(quality_passed, ctx)
2668+
quality_passed, ctx = self._apply_cost_gate(quality_passed, ctx)
2669+
2670+
if self._is_token_limit_exceeded():
2671+
logger.warning(
2672+
"[Phase 2 Iter %d] -> Token limit exceeded; stopping Phase 2 early",
2673+
iteration,
2674+
)
2675+
non_candidates.append(ctx)
2676+
break
2677+
2678+
if quality_passed:
2679+
logger.info(
2680+
"[Phase 2 Iter %d] -> Passed quality + gates — added to candidates",
2681+
iteration,
2682+
)
2683+
candidates.append(ctx)
2684+
else:
2685+
logger.info(
2686+
"[Phase 2 Iter %d] -> Failed quality or gates", iteration
2687+
)
2688+
non_candidates.append(ctx)
2689+
2690+
if i < max_iters - 1:
2691+
self._safe_status_update("turn completed", ctx, iteration)
2692+
2693+
# Report results: fail non-winners first (preserving _last_succeeded_context
2694+
# as the Phase 1 winner until the very end), then succeed the best candidate.
2695+
for failed_ctx in non_candidates:
2696+
self._handle_failure(failed_ctx, failed_ctx.iteration)
2697+
2698+
if candidates:
2699+
best = self._pick_best_candidate(candidates)
2700+
self._handle_success(best, best.iteration)
2701+
logger.info(
2702+
"[Phase 2] -> Best candidate selected: model=%s, duration_ms=%s, cost=%s",
2703+
best.current_model,
2704+
f"{best.duration_ms:.0f}ms" if best.duration_ms is not None else "N/A",
2705+
f"${best.estimated_cost_usd:.6f}" if best.estimated_cost_usd is not None else "N/A",
2706+
)
2707+
else:
2708+
logger.info(
2709+
"[Phase 2] -> No candidates passed; keeping Phase 1 winner as final result"
2710+
)
2711+
2712+
self._in_cost_latency_phase = False
2713+
25262714
def _commit_variation(
25272715
self,
25282716
optimize_context: OptimizationContext,
@@ -2595,6 +2783,8 @@ def _commit_variation(
25952783
"instructions": optimize_context.current_instructions,
25962784
"modelConfigKey": model_config_key,
25972785
}
2786+
if optimize_context.current_parameters:
2787+
payload["parameters"] = optimize_context.current_parameters
25982788
if self._initial_tool_keys:
25992789
payload["toolKeys"] = list(self._initial_tool_keys)
26002790
if self._initial_model_custom:
@@ -2741,9 +2931,6 @@ async def _run_validation_phase(
27412931
else:
27422932
sample_passed = self._evaluate_response(val_ctx)
27432933

2744-
sample_passed, val_ctx = self._apply_duration_gate(sample_passed, val_ctx)
2745-
sample_passed, val_ctx = self._apply_cost_gate(sample_passed, val_ctx)
2746-
27472934
if self._is_token_limit_exceeded():
27482935
logger.error(
27492936
"[Validation %d/%d] -> Token limit exceeded (total=%d)",
@@ -2869,9 +3056,6 @@ async def _run_optimization(
28693056
iteration,
28703057
)
28713058

2872-
initial_passed, optimize_context = self._apply_duration_gate(initial_passed, optimize_context)
2873-
initial_passed, optimize_context = self._apply_cost_gate(initial_passed, optimize_context)
2874-
28753059
# Token limit check after pass/fail evaluation so the persisted record
28763060
# correctly reflects whether the iteration passed before stopping the run.
28773061
if self._is_token_limit_exceeded():
@@ -2887,7 +3071,17 @@ async def _run_optimization(
28873071
optimize_context, iteration
28883072
)
28893073
if all_valid:
2890-
return self._handle_success(optimize_context, iteration)
3074+
self._handle_success(optimize_context, iteration)
3075+
phase1_winner = self._last_succeeded_context
3076+
if (
3077+
self._options.latency_optimization
3078+
or self._options.token_optimization
3079+
) and not self._is_token_limit_exceeded():
3080+
await self._run_cost_latency_phase(optimize_context, iteration)
3081+
if self._last_succeeded_context is None:
3082+
self._last_run_succeeded = True
3083+
self._last_succeeded_context = phase1_winner
3084+
return self._last_succeeded_context
28913085
if self._is_token_limit_exceeded():
28923086
return self._handle_failure(last_ctx, iteration)
28933087
# Validation failed — treat as a normal failed attempt.

packages/optimization/src/ldai_optimizer/dataclasses.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ class OptimizationJudgeContext:
316316
"turn completed",
317317
"success",
318318
"failure",
319+
"optimizing cost/latency",
319320
]
320321

321322

0 commit comments

Comments
 (0)