forked from plastic-labs/honcho
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.py
More file actions
420 lines (381 loc) · 15.2 KB
/
Copy pathapi.py
File metadata and controls
420 lines (381 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
"""Public LLM entrypoint: `honcho_llm_call`.
Orchestrates:
- Runtime config resolution from ConfiguredModelSettings → ModelConfig.
- Per-attempt planning (primary vs fallback selection).
- Retry with exponential backoff via tenacity.
- Tool-loop delegation when tools are supplied.
- Single-call delegation to the executor otherwise.
- Reasoning-trace telemetry emission.
"""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator, Callable
from typing import Any, Literal, TypeVar, cast, overload
from pydantic import BaseModel
from sentry_sdk.ai.monitoring import ai_track
from tenacity import retry, stop_after_attempt, wait_exponential
from src.config import ConfiguredModelSettings, ModelConfig
from src.exceptions import ValidationException
from src.telemetry.llm_call_metrics import (
finalize_success,
mark_max_iterations,
observe_llm_call,
)
from src.telemetry.logging import conditional_observe
from src.telemetry.reasoning_traces import log_reasoning_trace
from .executor import honcho_llm_call_inner
from .runtime import (
AttemptPlan,
current_attempt,
effective_temperature,
plan_attempt,
resolve_runtime_model_config,
update_current_langfuse_observation,
)
from .tool_loop import execute_tool_loop
from .types import (
HonchoLLMCallResponse,
HonchoLLMCallStreamChunk,
IterationCallback,
ReasoningEffortType,
StreamingResponseWithMetadata,
)
logger = logging.getLogger(__name__)
M = TypeVar("M", bound=BaseModel)
@overload
async def honcho_llm_call(
*,
model_config: ModelConfig | ConfiguredModelSettings,
prompt: str,
max_tokens: int,
track_name: str | None = None,
response_model: type[M],
json_mode: bool = False,
temperature: float | None = None,
stop_seqs: list[str] | None = None,
reasoning_effort: ReasoningEffortType = None,
verbosity: Literal["low", "medium", "high"] | None = None,
thinking_budget_tokens: int | None = None,
enable_retry: bool = True,
retry_attempts: int = 3,
stream: Literal[False] = False,
stream_final_only: bool = False,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
tool_executor: Callable[[str, dict[str, Any]], Any] | None = None,
max_tool_iterations: int = 10,
messages: list[dict[str, Any]] | None = None,
max_input_tokens: int | None = None,
trace_name: str | None = None,
iteration_callback: IterationCallback | None = None,
) -> HonchoLLMCallResponse[M]: ...
@overload
async def honcho_llm_call(
*,
model_config: ModelConfig | ConfiguredModelSettings,
prompt: str,
max_tokens: int,
track_name: str | None = None,
response_model: None = None,
json_mode: bool = False,
temperature: float | None = None,
stop_seqs: list[str] | None = None,
reasoning_effort: ReasoningEffortType = None,
verbosity: Literal["low", "medium", "high"] | None = None,
thinking_budget_tokens: int | None = None,
enable_retry: bool = True,
retry_attempts: int = 3,
stream: Literal[False] = False,
stream_final_only: bool = False,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
tool_executor: Callable[[str, dict[str, Any]], Any] | None = None,
max_tool_iterations: int = 10,
messages: list[dict[str, Any]] | None = None,
max_input_tokens: int | None = None,
trace_name: str | None = None,
iteration_callback: IterationCallback | None = None,
) -> HonchoLLMCallResponse[str]: ...
@overload
async def honcho_llm_call(
*,
model_config: ModelConfig | ConfiguredModelSettings,
prompt: str,
max_tokens: int,
track_name: str | None = None,
response_model: type[BaseModel] | None = None,
json_mode: bool = False,
temperature: float | None = None,
stop_seqs: list[str] | None = None,
reasoning_effort: ReasoningEffortType = None,
verbosity: Literal["low", "medium", "high"] | None = None,
thinking_budget_tokens: int | None = None,
enable_retry: bool = True,
retry_attempts: int = 3,
stream: Literal[True] = ...,
stream_final_only: bool = False,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
tool_executor: Callable[[str, dict[str, Any]], Any] | None = None,
max_tool_iterations: int = 10,
messages: list[dict[str, Any]] | None = None,
max_input_tokens: int | None = None,
trace_name: str | None = None,
iteration_callback: IterationCallback | None = None,
) -> AsyncIterator[HonchoLLMCallStreamChunk] | StreamingResponseWithMetadata: ...
@conditional_observe(name="LLM Call")
async def honcho_llm_call(
*,
model_config: ModelConfig | ConfiguredModelSettings,
prompt: str,
max_tokens: int,
track_name: str | None = None,
response_model: type[BaseModel] | None = None,
json_mode: bool = False,
temperature: float | None = None,
stop_seqs: list[str] | None = None,
reasoning_effort: ReasoningEffortType = None,
verbosity: Literal["low", "medium", "high"] | None = None,
thinking_budget_tokens: int | None = None,
enable_retry: bool = True,
retry_attempts: int = 3,
stream: bool = False,
stream_final_only: bool = False,
tools: list[dict[str, Any]] | None = None,
tool_choice: str | dict[str, Any] | None = None,
tool_executor: Callable[[str, dict[str, Any]], Any] | None = None,
max_tool_iterations: int = 10,
messages: list[dict[str, Any]] | None = None,
max_input_tokens: int | None = None,
trace_name: str | None = None,
iteration_callback: IterationCallback | None = None,
) -> (
HonchoLLMCallResponse[Any]
| AsyncIterator[HonchoLLMCallStreamChunk]
| StreamingResponseWithMetadata
):
"""Make an LLM call with retry, optional backup failover, and optional tool loop.
Backup provider/model (if configured on the primary ModelConfig's
`fallback`) is used on the final retry attempt, which is 3 by default.
Raises:
ValidationException: If streaming and tool calling are combined
without `stream_final_only=True`.
"""
runtime_model_config = resolve_runtime_model_config(model_config)
# Caller kwargs left at None are resolved downstream by
# effective_config_for_call against whichever ModelConfig wins the
# attempt (primary or fallback). Defaulting here from
# runtime_model_config would clobber a fallback config's own
# temperature/thinking params on the final retry, so we deliberately
# keep the locals as the caller supplied them.
if stream and tools and not stream_final_only:
raise ValidationException(
"Streaming is not supported with tool calling. "
+ "Set stream=False when using tools, or use stream_final_only=True "
+ "to stream only the final response after tool calls."
)
# tenacity uses 1-indexed attempts.
current_attempt.set(1)
# Captures the AttemptPlan that produced the most recent (and on success,
# the winning) call so observability can label by the model that actually
# answered — primary on early attempts, backup on the final retry.
last_plan: dict[str, AttemptPlan | None] = {"value": None}
def _get_attempt_plan() -> AttemptPlan:
plan = plan_attempt(
runtime_model_config=runtime_model_config,
attempt=current_attempt.get(),
retry_attempts=retry_attempts,
call_thinking_budget_tokens=thinking_budget_tokens,
call_reasoning_effort=reasoning_effort,
)
last_plan["value"] = plan
update_current_langfuse_observation(
plan.provider,
plan.model,
name=track_name,
)
return plan
async def _call_with_provider_selection() -> (
HonchoLLMCallResponse[Any] | AsyncIterator[HonchoLLMCallStreamChunk]
):
"""Select provider/model based on current attempt, then call once.
This closure is what tenacity wraps, so selection re-runs per attempt
(and the fallback kicks in on the final attempt automatically).
"""
plan = _get_attempt_plan()
if stream:
return await honcho_llm_call_inner(
plan.provider,
plan.model,
prompt,
max_tokens,
response_model,
json_mode,
effective_temperature(temperature),
stop_seqs,
plan.reasoning_effort,
verbosity,
plan.thinking_budget_tokens,
stream=True,
client_override=plan.client,
tools=tools,
tool_choice=tool_choice,
selected_config=plan.selected_config,
)
return await honcho_llm_call_inner(
plan.provider,
plan.model,
prompt,
max_tokens,
response_model,
json_mode,
effective_temperature(temperature),
stop_seqs,
plan.reasoning_effort,
verbosity,
plan.thinking_budget_tokens,
stream=False,
client_override=plan.client,
tools=tools,
tool_choice=tool_choice,
selected_config=plan.selected_config,
)
decorated = _call_with_provider_selection
if track_name:
decorated = ai_track(track_name)(decorated)
def before_retry_callback(retry_state: Any) -> None:
"""Update attempt counter before each retry + log transient failures.
tenacity's before_sleep fires AFTER an attempt fails, BEFORE sleeping,
so we increment to the next attempt number here.
"""
next_attempt = retry_state.attempt_number + 1
current_attempt.set(next_attempt)
exc = retry_state.outcome.exception() if retry_state.outcome else None
if exc:
logger.warning(
f"Error on attempt {retry_state.attempt_number}/{retry_attempts} with "
+ f"{runtime_model_config.transport}/{runtime_model_config.model}: {exc}"
)
logger.info(f"Will retry with attempt {next_attempt}/{retry_attempts}")
if enable_retry:
decorated = retry(
stop=stop_after_attempt(retry_attempts),
wait=wait_exponential(multiplier=1, min=4, max=10),
before_sleep=before_retry_callback,
)(decorated)
def _trace_thinking_budget() -> int | None:
# Trace log should reflect what got applied, so fall back to the
# runtime config's value when the caller left the kwarg unset.
return (
thinking_budget_tokens
if thinking_budget_tokens is not None
else runtime_model_config.thinking_budget_tokens
)
def _trace_reasoning_effort() -> ReasoningEffortType:
if reasoning_effort is not None:
return reasoning_effort
config_effort = runtime_model_config.thinking_effort
return cast(ReasoningEffortType, config_effort) if config_effort else None
def _trace_stop_seqs() -> list[str] | None:
return (
stop_seqs if stop_seqs is not None else runtime_model_config.stop_sequences
)
with observe_llm_call(
track_name=track_name,
trace_name=trace_name,
runtime_model_config=runtime_model_config,
) as obs_state:
# Tool-less path: call once and return.
if not tools or not tool_executor:
result: (
HonchoLLMCallResponse[Any] | AsyncIterator[HonchoLLMCallStreamChunk]
) = await decorated()
response_for_metrics = (
result if isinstance(result, HonchoLLMCallResponse) else None
)
winning = last_plan["value"]
finalize_success(
obs_state,
response=response_for_metrics,
final_provider=str(winning.provider) if winning else None,
final_model=winning.model if winning else None,
attempts=current_attempt.get(),
iterations=None,
has_backup=runtime_model_config.fallback is not None,
)
if trace_name and isinstance(result, HonchoLLMCallResponse):
log_reasoning_trace(
task_type=trace_name,
model_config=runtime_model_config,
prompt=prompt,
response=result,
max_tokens=max_tokens,
thinking_budget_tokens=_trace_thinking_budget(),
reasoning_effort=_trace_reasoning_effort(),
json_mode=json_mode,
stop_seqs=_trace_stop_seqs(),
messages=messages,
)
return result
# execute_tool_loop raises ValidationException on out-of-range
# max_tool_iterations; fail-fast is cheaper than silent clamping here.
result = await execute_tool_loop(
prompt=prompt,
max_tokens=max_tokens,
messages=messages,
tools=tools,
tool_choice=tool_choice,
tool_executor=tool_executor,
max_tool_iterations=max_tool_iterations,
response_model=response_model,
json_mode=json_mode,
temperature=temperature,
stop_seqs=stop_seqs,
verbosity=verbosity,
enable_retry=enable_retry,
retry_attempts=retry_attempts,
max_input_tokens=max_input_tokens,
get_attempt_plan=_get_attempt_plan,
before_retry_callback=before_retry_callback,
stream_final=stream_final_only,
iteration_callback=iteration_callback,
track_name=track_name,
trace_name=trace_name,
)
response_for_metrics = (
result if isinstance(result, HonchoLLMCallResponse) else None
)
winning = last_plan["value"]
iterations = (
response_for_metrics.iterations
if response_for_metrics
else (getattr(result, "iterations", None))
)
finalize_success(
obs_state,
response=response_for_metrics,
final_provider=str(winning.provider) if winning else None,
final_model=winning.model if winning else None,
attempts=current_attempt.get(),
iterations=iterations,
has_backup=runtime_model_config.fallback is not None,
)
if response_for_metrics is not None and getattr(
response_for_metrics, "hit_max_iterations", False
):
mark_max_iterations(obs_state, iterations or max_tool_iterations)
if trace_name and isinstance(result, HonchoLLMCallResponse):
log_reasoning_trace(
task_type=trace_name,
model_config=runtime_model_config,
prompt=prompt,
response=result,
max_tokens=max_tokens,
thinking_budget_tokens=_trace_thinking_budget(),
reasoning_effort=_trace_reasoning_effort(),
json_mode=json_mode,
stop_seqs=_trace_stop_seqs(),
messages=messages,
)
return result
__all__ = ["honcho_llm_call"]