Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 85 additions & 29 deletions packages/optimization/src/ldai_optimizer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
LDApiClient,
)
from ldai_optimizer.prompts import (
_acceptance_criteria_implies_cost_optimization,
_acceptance_criteria_implies_duration_optimization,
build_message_history_text,
build_new_variation_prompt,
build_reasoning_history,
Expand Down Expand Up @@ -850,9 +848,7 @@ async def _evaluate_acceptance_judge(

if (
agent_duration_ms is not None
and _acceptance_criteria_implies_duration_optimization(
{judge_key: optimization_judge}
)
and bool(self._options.latency_optimization)
):
baseline_ms = self._baseline_duration_ms
instructions += (
Expand All @@ -875,7 +871,7 @@ async def _evaluate_acceptance_judge(
"These suggestions will be used directly to generate the next variation."
)

if _acceptance_criteria_implies_cost_optimization({judge_key: optimization_judge}):
if bool(self._options.token_optimization):
current_cost = estimate_cost(
agent_usage,
_find_model_config(self._current_model or "", self._model_configs),
Expand Down Expand Up @@ -975,7 +971,12 @@ async def _evaluate_acceptance_judge(
return dataclasses.replace(judge_result, duration_ms=judge_duration_ms, usage=judge_response.usage)

async def _get_agent_config(
self, agent_key: str, context: Context
self,
agent_key: str,
context: Context,
variation_key: Optional[str] = None,
project_key: Optional[str] = None,
base_url: Optional[str] = None,
) -> AIAgentConfig:
"""
Fetch the agent configuration, replacing the instructions with the raw variation
Expand All @@ -985,16 +986,39 @@ async def _get_agent_config(
(including the tracker). We then call variation() separately to retrieve the
unrendered instruction template and swap it in, keeping everything else intact.

When ``variation_key`` is provided the specific variation is fetched via the
LaunchDarkly REST API instead of using the SDK's default flag evaluation.

:param agent_key: The key for the agent to get the configuration for
:param context: The evaluation context
:param variation_key: Optional specific variation key to use as the base
:param project_key: LaunchDarkly project key; required when variation_key is set
:param base_url: Optional API base URL override
:return: AIAgentConfig with raw {{placeholder}} instruction templates intact
"""
try:
agent_config = self._ldClient.agent_config(agent_key, context)

# variation() returns the raw JSON before chevron.render(), so instructions
# still contain {{placeholder}} tokens rather than empty strings.
raw_variation = self._ldClient._client.variation(agent_key, context, {})
if variation_key:
assert self._api_key is not None
api_client = LDApiClient(
self._api_key,
**({"base_url": base_url} if base_url else {}),
)
ai_config = api_client.get_ai_config(project_key, agent_key)
match = next(
(v for v in (ai_config or {}).get("variations", []) if v.get("key") == variation_key),
None,
)
if match is None:
raise ValueError(
f"variation_key '{variation_key}' not found in agent config '{agent_key}'"
)
raw_variation = match
else:
# variation() returns the raw JSON before chevron.render(), so instructions
# still contain {{placeholder}} tokens rather than empty strings.
raw_variation = self._ldClient._client.variation(agent_key, context, {})
raw_instructions = raw_variation.get(
"instructions", agent_config.instructions
)
Expand Down Expand Up @@ -1030,20 +1054,20 @@ def _fetch_model_configs(
self,
project_key: Optional[str],
base_url: Optional[str],
judges: Optional[Dict[str, "OptimizationJudge"]],
token_optimization: Optional[bool],
) -> None:
"""Populate ``_model_configs`` from the LD API when credentials are available.

When an API key and project key are both present, fetches the model pricing
catalogue so that ``estimate_cost`` can produce USD figures and the cost gate
can make meaningful comparisons. If either is absent, ``_model_configs`` is
reset to an empty list and a warning is emitted when cost judges are in use —
cost optimization will silently pass rather than blocking the run.
reset to an empty list and a warning is emitted when token_optimization is
enabled — cost data will be unavailable and the cost gate will pass unconditionally.

:param project_key: LaunchDarkly project key, or None if not provided.
:param base_url: Optional API base URL override.
:param judges: Judge map from the caller's options, used only to decide
whether a cost-related warning is appropriate.
:param token_optimization: Whether token/cost optimization is enabled; used only to
decide whether a cost-related warning is appropriate.
"""
self._model_configs = []
if self._has_api_key and project_key:
Expand All @@ -1056,9 +1080,9 @@ def _fetch_model_configs(
self._model_configs = api_client.get_model_configs(project_key)
except Exception as exc:
logger.debug("Could not pre-fetch model configs: %s", exc)
elif _acceptance_criteria_implies_cost_optimization(judges or {}):
elif token_optimization:
logger.warning(
"Cost optimization requires LAUNCHDARKLY_API_KEY and project_key to be set; "
"Token optimization requires LAUNCHDARKLY_API_KEY and project_key to be set; "
"cost data will not be available and the cost gate will pass unconditionally"
)

Expand All @@ -1080,10 +1104,24 @@ async def optimize_from_options(
raise ValueError(
"auto_commit requires project_key to be set on OptimizationOptions"
)
if options.variation_key:
if not self._has_api_key:
raise ValueError(
"variation_key requires LAUNCHDARKLY_API_KEY to be set"
)
if not options.project_key:
raise ValueError(
"variation_key requires project_key to be set on OptimizationOptions"
)
self._agent_key = agent_key
self._fetch_model_configs(options.project_key, options.base_url, options.judges)
self._fetch_model_configs(options.project_key, options.base_url, options.token_optimization)
context = random.choice(options.context_choices)
agent_config = await self._get_agent_config(agent_key, context)
agent_config = await self._get_agent_config(
agent_key, context,
variation_key=options.variation_key,
project_key=options.project_key,
base_url=options.base_url,
)
result = await self._run_optimization(agent_config, options)
if options.auto_commit and self._last_run_succeeded and self._last_succeeded_context:
self._commit_variation(
Expand Down Expand Up @@ -1119,10 +1157,24 @@ async def optimize_from_ground_truth_options(
raise ValueError(
"auto_commit requires project_key to be set on GroundTruthOptimizationOptions"
)
if options.variation_key:
if not self._has_api_key:
raise ValueError(
"variation_key requires LAUNCHDARKLY_API_KEY to be set"
)
if not options.project_key:
raise ValueError(
"variation_key requires project_key to be set on GroundTruthOptimizationOptions"
)
self._agent_key = agent_key
self._fetch_model_configs(options.project_key, options.base_url, options.judges)
self._fetch_model_configs(options.project_key, options.base_url, options.token_optimization)
context = random.choice(options.context_choices)
agent_config = await self._get_agent_config(agent_key, context)
agent_config = await self._get_agent_config(
agent_key, context,
variation_key=options.variation_key,
project_key=options.project_key,
base_url=options.base_url,
)
result = await self._run_ground_truth_optimization(agent_config, options)
if options.auto_commit and self._last_run_succeeded and self._last_succeeded_context:
self._commit_variation(
Expand Down Expand Up @@ -1162,6 +1214,8 @@ async def _run_ground_truth_optimization(
on_failing_result=gt_options.on_failing_result,
on_status_update=gt_options.on_status_update,
token_limit=gt_options.token_limit,
latency_optimization=gt_options.latency_optimization,
token_optimization=gt_options.token_optimization,
)
self._options = bridge
self._agent_config = agent_config
Expand Down Expand Up @@ -1579,12 +1633,8 @@ async def _generate_new_variation(
)
self._safe_status_update("generating variation", status_ctx, iteration)

optimize_for_duration = _acceptance_criteria_implies_duration_optimization(
self._options.judges
)
optimize_for_cost = _acceptance_criteria_implies_cost_optimization(
self._options.judges
)
optimize_for_duration = bool(self._options.latency_optimization)
optimize_for_cost = bool(self._options.token_optimization)
quality_already_passing = self._all_judges_passing()
instructions = build_new_variation_prompt(
self._history,
Expand Down Expand Up @@ -1989,6 +2039,9 @@ def _persist_and_forward(
on_failing_result=options.on_failing_result,
on_status_update=_persist_and_forward,
token_limit=config.get("tokenLimit"),
latency_optimization=config.get("latencyOptimization"),
token_optimization=config.get("tokenOptimization"),
auto_commit=config.get("autoCommit", True),
Comment thread
cursor[bot] marked this conversation as resolved.
)

variable_choices: List[Dict[str, Any]] = config["variableChoices"] or [{}]
Expand All @@ -2009,6 +2062,9 @@ def _persist_and_forward(
on_failing_result=options.on_failing_result,
on_status_update=_persist_and_forward,
token_limit=config.get("tokenLimit"),
latency_optimization=config.get("latencyOptimization"),
token_optimization=config.get("tokenOptimization"),
auto_commit=config.get("autoCommit", True),
)

async def _execute_agent_turn(
Expand Down Expand Up @@ -2269,7 +2325,7 @@ def _apply_duration_gate(
:param ctx: Current optimization context.
:return: (passed, updated_ctx) where passed reflects gate outcome.
"""
if not _acceptance_criteria_implies_duration_optimization(self._options.judges):
if not bool(self._options.latency_optimization):
return passed_so_far, ctx
passed = self._evaluate_duration(ctx)
if passed:
Expand Down Expand Up @@ -2323,7 +2379,7 @@ def _apply_cost_gate(
:param ctx: Current optimization context.
:return: (passed, updated_ctx) where passed reflects gate outcome.
"""
if not _acceptance_criteria_implies_cost_optimization(self._options.judges):
if not bool(self._options.token_optimization):
return passed_so_far, ctx
passed = self._evaluate_cost(ctx)
if passed:
Expand Down
14 changes: 12 additions & 2 deletions packages/optimization/src/ldai_optimizer/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,14 @@ class OptimizationOptions:
context_choices: List[Context] = field(
default_factory=lambda: [Context.builder("anonymous").anonymous(True).build()]
)
# Base variation - Optional
variation_key: Optional[str] = None # use this specific variation as the base; defaults to the flag's default variation; requires API key + project_key
# Optimization controls - Optional; when None the corresponding gate/prompt is disabled
latency_optimization: Optional[bool] = None
token_optimization: Optional[bool] = None
# Auto-commit - Optional
auto_commit: bool = False
project_key: Optional[str] = None # required when auto_commit=True
project_key: Optional[str] = None # required when auto_commit=True or variation_key is set
output_key: Optional[str] = None # variation key/name; auto-generated if omitted
base_url: Optional[str] = None # override to target a non-default LD instance
on_passing_result: Optional[Callable[[OptimizationContext], None]] = None
Expand Down Expand Up @@ -440,9 +445,14 @@ class GroundTruthOptimizationOptions:
context_choices: List[Context] = field(
default_factory=lambda: [Context.builder("anonymous").anonymous(True).build()]
)
# Base variation - Optional
variation_key: Optional[str] = None # use this specific variation as the base; defaults to the flag's default variation; requires API key + project_key
# Optimization controls - Optional; when None the corresponding gate/prompt is disabled
latency_optimization: Optional[bool] = None
token_optimization: Optional[bool] = None
# Auto-commit - Optional
auto_commit: bool = False
project_key: Optional[str] = None # required when auto_commit=True
project_key: Optional[str] = None # required when auto_commit=True or variation_key is set
output_key: Optional[str] = None # variation key/name; auto-generated if omitted
base_url: Optional[str] = None # override to target a non-default LD instance
token_limit: Optional[int] = None # stop the run when total token usage reaches this value
Expand Down
3 changes: 3 additions & 0 deletions packages/optimization/src/ldai_optimizer/ld_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class AgentOptimizationConfig(_AgentOptimizationConfigRequired, total=False):
groundTruthResponses: List[str]
metricKey: str
tokenLimit: int
latencyOptimization: bool
tokenOptimization: bool
autoCommit: bool


# ---------------------------------------------------------------------------
Expand Down
59 changes: 0 additions & 59 deletions packages/optimization/src/ldai_optimizer/prompts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Prompt-building functions for LaunchDarkly AI optimization."""

import re
from typing import Any, Dict, List, Optional

from ldai_optimizer.dataclasses import (
Expand All @@ -9,64 +8,6 @@
)
from ldai_optimizer.util import judge_passed

_DURATION_KEYWORDS = re.compile(
r"\b(fast|faster|quickly|quick|latency|low-latency|duration|response\s+time|"
r"time\s+to\s+respond|milliseconds|performant|snappy|efficient|seconds)\b|"
r"(?<![a-zA-Z])ms\b",
re.IGNORECASE,
)

_COST_KEYWORDS = re.compile(
r"\b(cheap|cheaper|cheapest|costs?|costly|expensive|budget|affordable|"
r"spend|spending|economical|cost-effective|frugal|"
r"price|pricing|bill|billing)\b",
re.IGNORECASE,
)


def _acceptance_criteria_implies_duration_optimization(
judges: Optional[Dict[str, OptimizationJudge]],
) -> bool:
"""Return True if any judge acceptance statement implies a latency optimization goal.

Scans each judge's acceptance_statement for latency-related keywords. The
check is case-insensitive. Returns False when judges is None or no judge
carries an acceptance statement.

:param judges: Judge configuration dict from OptimizationOptions, or None.
:return: True if duration optimization should be applied.
"""
if not judges:
return False
for judge in judges.values():
if judge.acceptance_statement and _DURATION_KEYWORDS.search(
judge.acceptance_statement
):
return True
return False


def _acceptance_criteria_implies_cost_optimization(
judges: Optional[Dict[str, OptimizationJudge]],
) -> bool:
"""Return True if any judge acceptance statement implies a cost reduction goal.

Scans each judge's acceptance_statement for cost-related keywords. The
check is case-insensitive. Returns False when judges is None or no judge
carries an acceptance statement.

:param judges: Judge configuration dict from OptimizationOptions, or None.
:return: True if cost optimization should be applied.
"""
if not judges:
return False
for judge in judges.values():
if judge.acceptance_statement and _COST_KEYWORDS.search(
judge.acceptance_statement
):
return True
return False


def build_message_history_text(
history: List[OptimizationContext],
Expand Down
Loading
Loading