-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathutil.py
More file actions
359 lines (292 loc) · 13.8 KB
/
util.py
File metadata and controls
359 lines (292 loc) · 13.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
"""Utility functions for the LaunchDarkly AI optimization package."""
import inspect
import json
import logging
import random
import re
from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple, TypeVar, Union
if TYPE_CHECKING:
from ldai.tracker import TokenUsage
from ldai_optimizer._slug_words import _ADJECTIVES, _NOUNS
logger = logging.getLogger(__name__)
# Matches LaunchDarkly API key and SDK key formats:
# api-<hex/alphanumeric, 16+ chars>
# sdk-<hex/alphanumeric, 16+ chars>
# cli-<hex/alphanumeric, 16+ chars>
_KEY_PATTERN = re.compile(r"\b(api|sdk|cli)-[A-Za-z0-9_\-]{16,}\b")
class RedactionFilter(logging.Filter):
"""Logging filter that redacts strings resembling LaunchDarkly API keys.
Scrubs both the format string (``record.msg``) and each positional argument
(``record.args``) before the handler formats the final log line, so raw key
values are never written to any log destination.
"""
def filter(self, record: logging.LogRecord) -> bool:
record.msg = _KEY_PATTERN.sub("[REDACTED]", str(record.msg))
if record.args:
record.args = tuple(
_KEY_PATTERN.sub("[REDACTED]", str(a)) if isinstance(a, str) else a
for a in (record.args if isinstance(record.args, tuple) else (record.args,))
)
return True
logger.addFilter(RedactionFilter())
def generate_slug() -> str:
"""Generate a random ``adjective-noun`` slug (e.g. ``blazing-lobster``).
Produces the same format as ``coolname.generate_slug(2)`` using an
internal word list, removing the external dependency.
:return: A hyphen-joined two-word lowercase string.
"""
return f"{random.choice(_ADJECTIVES)}-{random.choice(_NOUNS)}"
def interpolate_variables(text: str, variables: Dict[str, Any]) -> str:
"""
Interpolate ``{{variable}}`` placeholders in text using the provided variables.
Matches LaunchDarkly's Mustache-style template format so that manually
generated variation instructions use the same syntax as LD-fetched templates.
Unrecognised placeholders are left unchanged.
:param text: Template string potentially containing ``{{key}}`` placeholders
:param variables: Mapping of variable names to their replacement values
:return: Text with all recognised placeholders replaced
"""
def replace(match: re.Match) -> str:
key = match.group(1).strip()
return str(variables[key]) if key in variables else match.group(0)
return re.sub(r"\{\{([\w-]+)\}\}", replace, text)
def restore_variable_placeholders(
text: str,
variable_choices: List[Dict[str, Any]],
min_value_length: int = 3,
) -> Tuple[str, List[str]]:
"""
Scan ``text`` for leaked variable values and restore them to ``{{key}}`` form.
This is the deterministic inverse of :func:`interpolate_variables`. It acts
as a post-processing safety net after variation generation: when the LLM
hardcodes a concrete variable value (e.g. ``user-123``) instead of writing
the placeholder (``{{user_id}}``), this function replaces the value back so
subsequent iterations receive correctly templated instructions.
Values are matched with boundary guards so that a value like ``user-123``
inside a longer token like ``user-1234`` is not substituted. Multi-line
values are handled identically to single-line ones — ``re.escape`` produces
a literal pattern and the lookbehind/lookahead only inspect the character
immediately adjacent to the match boundary.
Values shorter than ``min_value_length`` characters are skipped because
short strings (e.g. ``"en"``, ``"US"``) are too likely to appear
coincidentally in unrelated prose.
:param text: The generated instruction string to clean.
:param variable_choices: All possible variable dicts, used to build the
reverse value→key map. When the same value appears under multiple keys
the first key encountered wins.
:param min_value_length: Minimum character length a value must have before
it is considered for replacement. Defaults to 3.
:return: A tuple of ``(cleaned_text, warnings)`` where ``warnings`` is a
list of human-readable strings describing each replacement made.
"""
# Build reverse map: string(value) → key. Longest values first so that
# a longer value like "user-123-admin" is replaced before the shorter
# "user-123" substring, preventing partial-match corruption.
value_to_key: Dict[str, str] = {}
for choice in variable_choices:
for key, value in choice.items():
str_value = str(value)
if str_value not in value_to_key:
value_to_key[str_value] = key
sorted_entries = sorted(value_to_key.items(), key=lambda kv: len(kv[0]), reverse=True)
warnings: List[str] = []
for value, key in sorted_entries:
if len(value) < min_value_length:
continue
placeholder = f"{{{{{key}}}}}"
# Skip if the placeholder is already present — nothing to fix.
if placeholder in text and value not in text:
continue
total_count = 0
# Pass 1: replace {{value}} forms — the LLM used the runtime value as
# if it were a placeholder key (e.g. {{user-125}} instead of {{user_id}}).
# This must run before the boundary-guarded pass so that the bare value
# inside the braces is consumed here rather than matched by pass 2,
# which would otherwise leave the surrounding braces and produce
# {{{{user_id}}}}.
brace_pattern = r'\{\{' + re.escape(value) + r'\}\}'
new_text, brace_count = re.subn(brace_pattern, placeholder, text, flags=re.DOTALL)
if brace_count:
text = new_text
total_count += brace_count
# Pass 2: replace bare value occurrences with a boundary guard so that
# "user-123" inside "user-1234" is not substituted.
pattern = r'(?<![A-Za-z0-9_\-])' + re.escape(value) + r'(?![A-Za-z0-9_\-])'
new_text, count = re.subn(pattern, placeholder, text, flags=re.DOTALL)
if count:
text = new_text
total_count += count
if total_count:
warnings.append(
f"Variable value {value!r} found in generated instructions "
f"— replaced {total_count} occurrence(s) with placeholder {placeholder}"
)
return text, warnings
_T = TypeVar("_T")
async def await_if_needed(result: Union[_T, Awaitable[_T]]) -> _T:
"""
Handle both sync and async callable results.
:param result: Either a value or an awaitable that returns a value
:return: The resolved value
"""
if inspect.isawaitable(result):
return await result # type: ignore[return-value]
return result # type: ignore[return-value]
def validate_variation_response(response_data: Dict[str, Any]) -> List[str]:
"""Validate the shape of a parsed LLM variation response.
Checks that the three required fields are present and have the expected
types. An empty ``current_parameters`` dict is acceptable; an empty
``current_instructions`` or ``model`` string is flagged as an error
because downstream code cannot meaningfully use a blank value.
:param response_data: Parsed dict from the LLM (output of extract_json_from_response).
:return: List of human-readable error strings. Empty list means the response is valid.
"""
errors: List[str] = []
if "current_instructions" not in response_data:
errors.append("missing required field 'current_instructions'")
elif not isinstance(response_data["current_instructions"], str):
errors.append(
f"'current_instructions' must be a string, "
f"got {type(response_data['current_instructions']).__name__}"
)
elif not response_data["current_instructions"].strip():
errors.append("'current_instructions' must not be empty")
if "current_parameters" not in response_data:
errors.append("missing required field 'current_parameters'")
elif not isinstance(response_data["current_parameters"], dict):
errors.append(
f"'current_parameters' must be a dict, "
f"got {type(response_data['current_parameters']).__name__}"
)
if "model" not in response_data:
errors.append("missing required field 'model'")
elif not isinstance(response_data["model"], str):
errors.append(
f"'model' must be a string, got {type(response_data['model']).__name__}"
)
return errors
def extract_json_from_response(response_str: str) -> Dict[str, Any]:
"""
Parse a JSON object from an LLM response string.
Attempts direct JSON parsing first, then progressively falls back to
extracting JSON from markdown code blocks and balanced-brace scanning.
:param response_str: Raw string response from an LLM
:return: Parsed dictionary
:raises ValueError: If no valid JSON object can be extracted
"""
# Try direct parse first
try:
return json.loads(response_str)
except json.JSONDecodeError:
pass
response_data: Optional[Dict[str, Any]] = None
# Try to extract JSON from markdown code blocks
code_block_match = re.search(
r'```(?:json)?\s*(\{.*?\})\s*```',
response_str,
re.DOTALL,
)
if code_block_match:
try:
response_data = json.loads(code_block_match.group(1))
except json.JSONDecodeError:
pass
# Try balanced-brace scanning
if response_data is None:
start_idx = response_str.find('{')
if start_idx != -1:
logger.warning(
"Direct JSON parse and code-block extraction failed; "
"falling back to balanced-brace scanner. "
"Response may be malformed JSON (length: %d).",
len(response_str),
)
while start_idx != -1 and response_data is None:
brace_count = 0
i = start_idx
while i < len(response_str):
if response_str[i] == '{':
brace_count += 1
elif response_str[i] == '}':
brace_count -= 1
if brace_count == 0:
json_str = response_str[start_idx:i + 1]
try:
response_data = json.loads(json_str)
except json.JSONDecodeError:
start_idx = response_str.find('{', start_idx + 1)
break
i += 1
else:
# Exhausted the string without closing the object
break
# Legacy regex fallback
if response_data is None:
json_match = re.search(
r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*"current_instructions"[^{}]*(?:\{[^{}]*\}[^{}]*)*\}',
response_str,
re.DOTALL,
)
if json_match:
try:
response_data = json.loads(json_match.group())
except json.JSONDecodeError:
logger.debug(
"Extracted JSON string failed to parse: %s",
json_match.group()[:200],
)
raise ValueError(
"Failed to parse extracted JSON from variation generation response"
)
if response_data is None:
logger.debug(
"Failed to extract JSON from response. "
"Response length: %d",
len(response_str),
)
raise ValueError(
"Failed to parse structured output from variation generation. "
"Expected JSON object with 'current_instructions', 'current_parameters', and 'model' fields. "
f"Response length: {len(response_str)}"
)
return response_data
def judge_passed(score: float, threshold: float, is_inverted: bool) -> bool:
"""Return True when a judge score meets its threshold.
For standard judges (higher is better) the score must reach the threshold:
``score >= threshold``. For inverted judges (lower is better, e.g. toxicity)
the score must stay at or below the threshold: ``score <= threshold``.
"""
return score <= threshold if is_inverted else score >= threshold
def estimate_cost(
usage: Optional["TokenUsage"],
model_config: Optional[Dict[str, Any]],
) -> Optional[float]:
"""Estimate the monetary cost of a single agent call in USD.
Uses ``costPerInputToken`` and ``costPerOutputToken`` from the model config.
Returns ``None`` when either ``usage`` is ``None`` or no pricing fields are
present on the model config — ensuring the return value is always in USD or
absent, never a raw token count. This prevents unit-mismatch bugs when
comparing costs across iterations where the model (and its pricing
availability) may differ.
``costPerCachedInputToken`` is intentionally ignored — the estimate uses
input/output tokens only.
:param usage: Token usage from the agent call. When ``None``, returns ``None``.
:param model_config: Model config dict from ``get_model_configs()``, or ``None``.
:return: Estimated cost in USD, or ``None`` if usage or pricing data is absent, or if
both ``usage.input`` and ``usage.output`` are ``None`` (no token counts available).
"""
if usage is None:
return None
input_price = model_config.get("costPerInputToken") if model_config else None
output_price = model_config.get("costPerOutputToken") if model_config else None
if input_price is None and output_price is None:
return None
cost = 0.0
computed = False
if input_price is not None and usage.input is not None:
cost += usage.input * input_price
computed = True
if output_price is not None and usage.output is not None:
cost += usage.output * output_price
computed = True
return cost if computed else None