Skip to content
140 changes: 133 additions & 7 deletions packages/optimization/src/ldai_optimizer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
LDApiClient,
)
from ldai_optimizer.prompts import (
_acceptance_criteria_implies_cost_optimization,
_acceptance_criteria_implies_duration_optimization,
build_message_history_text,
build_new_variation_prompt,
Expand All @@ -57,6 +58,7 @@
from ldai_optimizer.util import (
RedactionFilter,
await_if_needed,
estimate_cost,
extract_json_from_response,
generate_slug,
interpolate_variables,
Expand Down Expand Up @@ -128,6 +130,11 @@ def _compute_validation_count(pool_size: int) -> int:
# under 80% of the baseline — i.e. at least 20% improvement.
_DURATION_TOLERANCE = 0.80

# Cost gate: a candidate must cost at most this fraction of the baseline
# (history[0].estimated_cost_usd) to pass when acceptance criteria imply a
# cost reduction goal. 0.80 means at least 20% cheaper than the baseline.
_COST_TOLERANCE = 0.80

# Maps SDK status strings to the API status/activity values expected by
# agent_optimization_result records. Defined at module level to avoid
# allocating the dict on every on_status_update invocation.
Expand Down Expand Up @@ -160,6 +167,7 @@ def __init__(self, ldClient: LDAIClient) -> None:
self._last_optimization_result_id: Optional[str] = None
self._initial_tool_keys: List[str] = []
self._total_token_usage: int = 0
self._model_configs: List[Dict[str, Any]] = []

if os.environ.get("LAUNCHDARKLY_API_KEY"):
self._has_api_key = True
Expand Down Expand Up @@ -392,6 +400,7 @@ async def _call_judges(
agent_tools: Optional[List[ToolDefinition]] = None,
expected_response: Optional[str] = None,
agent_duration_ms: Optional[float] = None,
agent_usage: Optional[Any] = None,
) -> Dict[str, JudgeResult]:
"""
Call all judges in parallel (auto-path).
Expand All @@ -411,6 +420,8 @@ async def _call_judges(
:param agent_duration_ms: Wall-clock duration of the agent call in milliseconds.
Forwarded to acceptance judges whose statement implies a latency goal so they
can mention the duration change in their rationale.
:param agent_usage: Token usage from the agent call. Forwarded to acceptance judges
whose statement implies a cost goal so they can mention token usage in their rationale.
:return: Dictionary of judge results (score and rationale)
"""
if not self._options.judges:
Expand Down Expand Up @@ -464,6 +475,7 @@ async def _call_judges(
agent_tools=resolved_agent_tools,
expected_response=expected_response,
agent_duration_ms=agent_duration_ms,
agent_usage=agent_usage,
)
judge_results[judge_key] = result

Expand Down Expand Up @@ -682,6 +694,7 @@ async def _evaluate_acceptance_judge(
agent_tools: Optional[List[ToolDefinition]] = None,
expected_response: Optional[str] = None,
agent_duration_ms: Optional[float] = None,
agent_usage: Optional[Any] = None,
) -> JudgeResult:
"""
Evaluate using an acceptance statement judge.
Expand All @@ -699,6 +712,8 @@ async def _evaluate_acceptance_judge(
:param agent_duration_ms: Wall-clock duration of the agent call in milliseconds.
When the acceptance statement implies a latency goal, the judge is instructed
to mention the duration change in its rationale.
:param agent_usage: Token usage from the agent call. When the acceptance statement
implies a cost goal, the judge is instructed to mention token usage and cost.
:return: The judge result with score and rationale
"""
if not optimization_judge.acceptance_statement:
Expand Down Expand Up @@ -757,8 +772,50 @@ async def _evaluate_acceptance_judge(
f"This response was {abs(delta_ms):.0f}ms {direction} than the baseline. "
)
instructions += (
"Please mention the duration and any change from baseline in your rationale."
"In your rationale, state the duration and any change from baseline. "
"If the latency goal is not yet met, include specific, actionable suggestions "
"for how the agent's instructions or model choice could be changed to reduce "
"response time — for example: switching to a faster model, shortening the "
"system prompt, or removing instructions that cause multi-step reasoning. "
"These suggestions will be used directly to generate the next variation."
)

if _acceptance_criteria_implies_cost_optimization({judge_key: optimization_judge}):
current_cost = estimate_cost(
agent_usage,
_find_model_config(self._current_model or "", self._model_configs),
)
baseline_cost = (
self._history[0].estimated_cost_usd
if self._history and self._history[0].estimated_cost_usd is not None
else None
)
if current_cost is not None:
instructions += (
f"\n\nThe acceptance criteria for this judge includes a cost/token-usage goal. "
)
if agent_usage is not None:
instructions += (
f"The agent's response used {agent_usage.input} input tokens "
f"and {agent_usage.output} output tokens "
f"(estimated cost: ${current_cost:.6f}). "
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Token count f-string may print None values

Low Severity

When agent_usage.input or agent_usage.output is None (which TokenUsage allows — estimate_cost explicitly guards against this), the f-string at this location would produce text like "used None input tokens and 40 output tokens" in the judge instructions. This happens because estimate_cost can return a non-None cost using only the non-None token count, but the f-string unconditionally formats both fields.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit d267832. Configure here.

if baseline_cost is not None:
delta = current_cost - baseline_cost
direction = "less" if delta < 0 else "more"
instructions += (
f"The baseline cost (first iteration) was ${baseline_cost:.6f}. "
f"This response cost ${abs(delta):.6f} {direction} than the baseline. "
)
instructions += (
"In your rationale, state the token usage and cost, and any change from baseline. "
"If the cost goal is not yet met, include specific, actionable suggestions "
"for how the agent's instructions or model choice could be changed to reduce "
"cost — for example: switching to a cheaper model, shortening the system prompt "
"to reduce input tokens, removing unnecessary output instructions, or tightening "
"response length constraints. "
"These suggestions will be used directly to generate the next variation."
)

if resolved_variables:
instructions += f"\n\nThe following variables were available to the agent: {json.dumps(resolved_variables)}"
Expand Down Expand Up @@ -1082,6 +1139,11 @@ async def _run_ground_truth_optimization(
):
sample_passed = self._evaluate_duration(optimize_context)

if sample_passed and _acceptance_criteria_implies_cost_optimization(
self._options.judges
):
sample_passed = self._evaluate_cost(optimize_context)

if not sample_passed:
logger.info(
"[GT Attempt %d] -> Sample %d/%d FAILED",
Expand Down Expand Up @@ -1227,12 +1289,19 @@ def _apply_new_variation_response(
# This is a deterministic safety net for when the LLM ignores the prompt
# instructions and hardcodes a concrete value (e.g. "user-123") instead
# of the placeholder ("{{user_id}}").
# Only check the variables that were actually used for this invocation so
# we don't spuriously replace values that happen to appear in other choices.
active_variables = (
[variation_ctx.current_variables]
if variation_ctx.current_variables
else self._options.variable_choices
)
self._current_instructions, placeholder_warnings = restore_variable_placeholders(
self._current_instructions,
self._options.variable_choices,
active_variables,
)
for msg in placeholder_warnings:
logger.warning("[Iteration %d] -> %s", iteration, msg)
logger.debug("[Iteration %d] -> %s", iteration, msg)

self._current_parameters = response_data["current_parameters"]

Expand Down Expand Up @@ -1321,6 +1390,9 @@ async def _generate_new_variation(
optimize_for_duration = _acceptance_criteria_implies_duration_optimization(
self._options.judges
)
optimize_for_cost = _acceptance_criteria_implies_cost_optimization(
self._options.judges
)
instructions = build_new_variation_prompt(
self._history,
self._options.judges,
Expand All @@ -1331,6 +1403,7 @@ async def _generate_new_variation(
self._options.variable_choices,
self._initial_instructions,
optimize_for_duration=optimize_for_duration,
optimize_for_cost=optimize_for_cost,
)

# Create a flat history list (without nested history) to avoid exponential growth
Expand Down Expand Up @@ -1424,6 +1497,7 @@ async def optimize_from_config(
model_configs = api_client.get_model_configs(options.project_key)
except Exception as exc:
logger.debug("Could not pre-fetch model configs: %s", exc)
self._model_configs = model_configs

context = random.choice(options.context_choices)
# _get_agent_config calls _initialize_class_members_from_config internally;
Expand Down Expand Up @@ -1793,18 +1867,24 @@ async def _execute_agent_turn(
agent_tools=agent_tools,
expected_response=expected_response,
agent_duration_ms=agent_duration_ms,
agent_usage=agent_response.usage,
)

# Build the fully-populated result context before firing the evaluating event so
# the PATCH includes scores, generationLatency, and completionResponse. This is
# particularly important for non-final GT samples which receive no further status
# events — without this, those fields would never be written to their API records.
agent_cost = estimate_cost(
agent_response.usage,
_find_model_config(self._current_model or "", self._model_configs),
)
result_ctx = dataclasses.replace(
optimize_context,
completion_response=completion_response,
scores=scores,
duration_ms=agent_duration_ms,
usage=agent_response.usage,
estimated_cost_usd=agent_cost,
)

if self._options.judges:
Expand All @@ -1829,13 +1909,13 @@ def _accumulate_tokens(self, optimize_context: OptimizationContext) -> None:
def _is_token_limit_exceeded(self) -> bool:
"""Return True if the accumulated token usage has met or exceeded the configured limit.

Returns False when no token limit is set so callers can use this as a
simple guard without needing to check for ``None`` themselves.
Returns False when no token limit is set, or when the limit is 0 (which is
treated as "no limit" — a sentinel value meaning the field was left unset).

:return: True if token limit is set and ``_total_token_usage >= token_limit``.
:return: True if a positive token limit is set and ``_total_token_usage >= token_limit``.
"""
limit: Optional[int] = getattr(self._options, "token_limit", None)
return limit is not None and self._total_token_usage >= limit
return bool(limit) and self._total_token_usage >= limit

def _evaluate_response(self, optimize_context: OptimizationContext) -> bool:
"""
Expand Down Expand Up @@ -1896,6 +1976,42 @@ def _evaluate_duration(self, optimize_context: OptimizationContext) -> bool:
)
return passed

def _evaluate_cost(self, optimize_context: OptimizationContext) -> bool:
"""
Check whether the candidate's estimated cost meets the improvement target vs. the baseline.

The baseline is history[0].estimated_cost_usd — the very first completed iteration,
representing the original unoptimized configuration's cost. The candidate must be
at least _COST_TOLERANCE cheaper (default: 20% improvement).

The cost value is in USD when model pricing data is available, or raw total token
count as a proxy when pricing is absent. Both are comparable relative to their
own baselines.

Returns True without blocking when no baseline is available (empty history or
history[0].estimated_cost_usd is None), or when the candidate's cost was not
captured. This avoids penalising configurations when cost data is missing.

:param optimize_context: The completed turn context containing estimated_cost_usd
:return: True if the cost requirement is met or cannot be checked
"""
if not self._history or self._history[0].estimated_cost_usd is None:
return True
if optimize_context.estimated_cost_usd is None:
return True
baseline = self._history[0].estimated_cost_usd
passed = optimize_context.estimated_cost_usd < baseline * _COST_TOLERANCE
Comment thread
cursor[bot] marked this conversation as resolved.
if not passed:
logger.warning(
"[Iteration %d] -> Cost check failed: %.6f >= baseline %.6f * %.0f%% (%.6f)",
optimize_context.iteration,
optimize_context.estimated_cost_usd,
baseline,
_COST_TOLERANCE * 100,
baseline * _COST_TOLERANCE,
)
return passed

def _handle_success(
self, optimize_context: OptimizationContext, iteration: int
) -> Any:
Expand Down Expand Up @@ -2174,6 +2290,11 @@ async def _run_validation_phase(
):
sample_passed = self._evaluate_duration(val_ctx)

if sample_passed and _acceptance_criteria_implies_cost_optimization(
self._options.judges
):
sample_passed = self._evaluate_cost(val_ctx)

last_ctx = val_ctx

if not sample_passed:
Expand Down Expand Up @@ -2298,6 +2419,11 @@ async def _run_optimization(
):
initial_passed = self._evaluate_duration(optimize_context)

if initial_passed and _acceptance_criteria_implies_cost_optimization(
self._options.judges
):
initial_passed = self._evaluate_cost(optimize_context)

if initial_passed:
all_valid, last_ctx = await self._run_validation_phase(
optimize_context, iteration
Expand Down
3 changes: 3 additions & 0 deletions packages/optimization/src/ldai_optimizer/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class OptimizationContext:
iteration: int = 0 # current iteration number
duration_ms: Optional[float] = None # wall-clock time for the agent call in milliseconds
usage: Optional[TokenUsage] = None # token usage reported by the agent for this iteration
estimated_cost_usd: Optional[float] = None # estimated cost; USD when pricing available, else total tokens

def copy_without_history(self) -> OptimizationContext:
"""
Expand All @@ -236,6 +237,7 @@ def copy_without_history(self) -> OptimizationContext:
iteration=self.iteration,
duration_ms=self.duration_ms,
usage=self.usage,
estimated_cost_usd=self.estimated_cost_usd,
)

def to_json(self) -> Dict[str, Any]:
Expand All @@ -261,6 +263,7 @@ def to_json(self) -> Dict[str, Any]:
"history": history_list,
"iteration": self.iteration,
"duration_ms": self.duration_ms,
"estimated_cost_usd": self.estimated_cost_usd,
}
if self.usage is not None:
result["usage"] = {
Expand Down
Loading
Loading