-
Notifications
You must be signed in to change notification settings - Fork 91
LCORE-2310: Added non-streaming agent query utilities #1880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,328 @@ | ||
| """Non-streaming agent helpers and shared turn-summary builders for agent runs.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from enum import Enum | ||
| from typing import TypeAlias, cast | ||
|
|
||
| from fastapi import HTTPException | ||
| from llama_stack_client import APIConnectionError, APIStatusError, AsyncLlamaStackClient | ||
| from pydantic_ai.exceptions import ( | ||
| AgentRunError, | ||
| ContentFilterError, | ||
| IncompleteToolCall, | ||
| ModelAPIError, | ||
| ModelHTTPError, | ||
| UnexpectedModelBehavior, | ||
| UsageLimitExceeded, | ||
| ) | ||
| from pydantic_ai.messages import ModelRequest, ModelResponse, ToolReturnPart | ||
| from pydantic_ai.run import AgentRunResult | ||
| from pydantic_ai.usage import RunUsage | ||
|
|
||
| from configuration import configuration | ||
| from log import get_logger | ||
| from metrics import recording | ||
| from models.api.responses.error import ( | ||
| AbstractErrorResponse, | ||
| InternalServerErrorResponse, | ||
| PromptTooLongResponse, | ||
| QuotaExceededResponse, | ||
| ServiceUnavailableResponse, | ||
| ) | ||
| from models.common.agents import AgentTurnAccumulator | ||
| from models.common.moderation import ShieldModerationResult | ||
| from models.common.responses.responses_api_params import ResponsesApiParams | ||
| from models.common.turn_summary import TurnSummary | ||
| from utils.agents.tool_processor import ( | ||
| process_function_tool_call, | ||
| process_function_tool_result, | ||
| process_native_tool_call, | ||
| process_native_tool_result, | ||
| ) | ||
| from utils.conversations import append_turn_items_to_conversation | ||
| from utils.pydantic_ai import build_agent | ||
| from utils.query import ( | ||
| extract_provider_and_model_from_model_id, | ||
| handle_known_apistatus_errors, | ||
| is_context_length_error, | ||
| ) | ||
| from utils.responses import extract_vector_store_ids_from_tools | ||
| from utils.token_counter import TokenCounter | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
| AgentInferenceError: TypeAlias = ( | ||
| AgentRunError | APIStatusError | APIConnectionError | RuntimeError | ||
| ) | ||
|
|
||
|
|
||
| class AgentFinishReason(str, Enum): | ||
| """Finish reason for a completed agent model response.""" | ||
|
|
||
| CONTENT_FILTER = "content_filter" | ||
| CANCELLED = "cancelled" | ||
| SUCCESS = "stop" | ||
| LENGTH = "length" | ||
| ERROR = "error" | ||
|
|
||
|
|
||
| def map_agent_inference_error( | ||
| exc: AgentInferenceError, | ||
| model_id: str, | ||
| ) -> AbstractErrorResponse: | ||
| """Map agent run failures from pydantic-ai or Llama Stack to an LCS error response. | ||
|
|
||
| Args: | ||
| exc: Agent, HTTP status, connection, or context-length runtime error. | ||
| model_id: Model identifier in provider/model format. | ||
|
|
||
| Returns: | ||
| Structured error response for HTTP or SSE error events. | ||
|
|
||
| Raises: | ||
| RuntimeError: Re-raised when ``exc`` is a non-agent ``RuntimeError`` that is | ||
| not a recognized context-length failure. | ||
| """ | ||
| match exc: | ||
| case AgentRunError() as agent_exc: | ||
| return map_pydantic_agent_run_error(agent_exc, model_id) | ||
| case APIStatusError() as status_exc: | ||
| return handle_known_apistatus_errors(status_exc, model_id) | ||
| case APIConnectionError() as connection_exc: | ||
| return ServiceUnavailableResponse( | ||
| backend_name="Llama Stack", | ||
| cause=str(connection_exc), | ||
| ) | ||
| case RuntimeError() as runtime_exc if is_context_length_error(str(runtime_exc)): | ||
| return PromptTooLongResponse(model=model_id) | ||
| case _: | ||
| return InternalServerErrorResponse.generic() | ||
|
|
||
|
|
||
| def map_pydantic_agent_run_error( | ||
| exc: AgentRunError, model_id: str | ||
| ) -> AbstractErrorResponse: | ||
| """Map pydantic-ai ``AgentRunError`` subclasses to LCS error responses. | ||
|
|
||
| Args: | ||
| exc: Agent exception to map. | ||
| model_id: Model identifier in provider/model format. | ||
|
|
||
| Returns: | ||
| Structured error response for HTTP or SSE error events. | ||
| """ | ||
| match exc: | ||
| case ContentFilterError() as filter_exc: | ||
| return InternalServerErrorResponse.query_failed(str(filter_exc)) | ||
| case IncompleteToolCall() | UnexpectedModelBehavior(): | ||
| return PromptTooLongResponse(model=model_id) | ||
| case UsageLimitExceeded(): | ||
| return QuotaExceededResponse.model(model_id) | ||
| case ModelHTTPError() as http_exc if is_context_length_error(str(http_exc)): | ||
| return PromptTooLongResponse(model=model_id) | ||
| case ModelHTTPError(status_code=429): | ||
| return QuotaExceededResponse.model(model_id) | ||
| case ModelHTTPError(): | ||
| return InternalServerErrorResponse.generic() | ||
| case ModelAPIError() as api_exc: | ||
| return ServiceUnavailableResponse( | ||
| backend_name="Llama Stack", | ||
| cause=str(api_exc), | ||
| ) | ||
| case _: | ||
| return InternalServerErrorResponse.query_failed(str(exc)) | ||
|
|
||
|
|
||
| def get_agent_finish_reason(response: ModelResponse) -> AgentFinishReason: | ||
| """Get the finish reason from a completed agent model response. | ||
|
|
||
| Args: | ||
| response: Last model response from the agent run. | ||
|
|
||
| Returns: | ||
| Resolved finish reason. | ||
| """ | ||
| raw_finish_reason = (response.provider_details or {}).get("finish_reason") | ||
| if raw_finish_reason == "cancelled": | ||
| return AgentFinishReason.CANCELLED | ||
| if response.finish_reason is None: | ||
| return AgentFinishReason.ERROR | ||
| return AgentFinishReason(response.finish_reason) | ||
|
|
||
|
Comment on lines
+149
to
+152
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check pydantic-ai FinishReason definition to confirm possible values
rg -n "FinishReason" --type py -A 5Repository: lightspeed-core/lightspeed-stack Length of output: 7310 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Show the relevant enum + get_agent_finish_reason implementation
sed -n '40,180p' src/utils/agents/query.py
# Show the unit tests for get_agent_finish_reason
sed -n '120,220p' tests/unit/utils/agents/test_query.pyRepository: lightspeed-core/lightspeed-stack Length of output: 8661 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Locate imports/definitions around ModelResponse / FinishReason usage
rg -n "from .* import .*ModelResponse|ModelResponse" src/utils/agents/query.py src/utils -S
# Find how finish_reason is typed/used elsewhere
rg -n "\"tool_calls\"|\"tool_output\"|finish_reason\s*:" src -S
# Show utils.pydantic_ai module (likely where ModelResponse/FinishReason are re-exported)
fd -a -t f "pydantic_ai.py" src/utils -x echo
fd -a -t f "pydantic_ai*" src/utils -x echoRepository: lightspeed-core/lightspeed-stack Length of output: 1254 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Inspect utils/pydantic_ai.py for re-exports/types that define ModelResponse / FinishReason
sed -n '1,220p' src/utils/pydantic_ai.py
# Find any references to "finish_reason" across the repo
rg -n "finish_reason" src -S
# Inspect pydantic-ai ModelResponse type hints only if vendored/aliased in this repo;
# otherwise, we can at least see whether code ever expects tool_calls/tool_output.
rg -n "\"tool_calls\"|\"tool_output\"" src -SRepository: lightspeed-core/lightspeed-stack Length of output: 5568 🏁 Script executed: #!/bin/bash
set -euo pipefail
python3 - <<'PY'
import inspect
import typing
try:
import pydantic_ai
from pydantic_ai.messages import ModelResponse
except Exception as e:
print("Failed to import pydantic_ai / ModelResponse:", e)
raise
print("pydantic_ai version:", getattr(pydantic_ai, "__version__", "unknown"))
print("ModelResponse:", ModelResponse)
# Print annotated fields for ModelResponse
ann = getattr(ModelResponse, "__annotations__", None)
print("ModelResponse.__annotations__ keys:", sorted(list(ann.keys())) if ann else None)
# Try to find finish_reason field annotation
finish_ann = None
if ann and "finish_reason" in ann:
finish_ann = ann["finish_reason"]
print("finish_reason annotation:", finish_ann)
# Try to resolve/inspect any enums/literals involved in finish_reason
try:
from pydantic_ai.messages import FinishReason
print("FinishReason:", FinishReason)
print("FinishReason members/values:")
if hasattr(FinishReason, "__members__"):
print(list(FinishReason.__members__.items())[:50])
else:
print("Non-enum type:", FinishReason)
except Exception as e:
print("Could not import FinishReason from pydantic_ai.messages:", e)
# If finish_reason is a Literal, show its args
if finish_ann is not None:
origin = typing.get_origin(finish_ann)
args = typing.get_args(finish_ann)
print("finish_reason origin:", origin)
print("finish_reason args (first 30):", list(args)[:30])
PYRepository: lightspeed-core/lightspeed-stack Length of output: 294 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Find dependency pins for pydantic-ai / pydantic_ai
ls -la
fd -a -t f "pyproject.toml|requirements*.txt|uv.lock|poetry.lock|Pipfile.lock" . 2>/dev/null || true
rg -n "pydantic[-_]ai|pydantic_ai" -S pyproject.toml requirements*.txt poetry.lock uv.lock Pipfile.lock 2>/dev/null || true
# Also show the relevant dependency section if pyproject exists
if [ -f pyproject.toml ]; then
sed -n '1,200p' pyproject.toml
fiRepository: lightspeed-core/lightspeed-stack Length of output: 9409 🌐 Web query:
💡 Result: In Pydantic AI, FinishReason, tool_calls, and tool_output are core concepts related to how an agent manages model responses and tool execution. FinishReason The FinishReason is an enumeration that describes why a model stopped generating a response, normalized to OpenTelemetry values [1]. It includes the following possible values: 'stop', 'length', 'content_filter', 'tool_call', and 'error' [1]. Developers can use the finish_reason attribute on a ModelResponse to inspect why a model ended its generation, for instance, to implement fallback logic if a response was truncated due to length limits or a content filter [2]. Tool Calls and Tool Output Pydantic AI distinguishes between function tools (which perform actions) and output tools (which produce the final result of a run) [3][4]. 1. Tool Calls: When a model decides to use a tool, it emits a tool call. If the model emits a tool call, the finish_reason for that response is typically 'tool_call' [1][5]. 2. Tool Output: This is the data returned after executing a tool. In the default "Tool Output" mode, the JSON schema for output types is provided to the model as a special output tool [6][7]. 3. Managing Execution: Recent updates, such as those in PR Citations:
🏁 Script executed: #!/bin/bash
set -euo pipefail
rg -n "get_agent_finish_reason\\(|finish_reason=.*tool" tests src -SRepository: lightspeed-core/lightspeed-stack Length of output: 882 🏁 Script executed: #!/bin/bash
set -euo pipefail
sed -n '200,280p' src/utils/agents/query.pyRepository: lightspeed-core/lightspeed-stack Length of output: 2918 🏁 Script executed: #!/bin/bash
set -euo pipefail
rg -n "\"tool_call\"|\"tool_calls\"|\"tool_output\"|tool_output|tool_calls" src tests -SRepository: lightspeed-core/lightspeed-stack Length of output: 28367 🌐 Web query:
💡 Result: In Pydantic AI, the FinishReason type is a TypeAlias representing the reason a model stopped generating a response [1]. It is defined as a Literal string type with the following valid values [1]: stop: The model reached a natural stop point or a provided stop sequence [1][2]. length: The maximum number of tokens specified in the request was reached [1][2]. content_filter: The generation was stopped due to a content filter flag [1][2]. tool_call: The model initiated a tool call [1]. error: The generation ended due to an error (e.g., failure or cancellation) [1][3][4]. The ModelResponse class includes a finish_reason attribute (of type FinishReason | None) that records this information [1]. This field is intended to normalize diverse provider-specific finish reasons into a standardized format aligned with OpenTelemetry conventions [1][5]. Individual model providers (such as OpenAI, Anthropic, and xAI) implement internal mapping logic to convert their native API response finish reasons into these standardized Pydantic AI FinishReason values [3][6][7][8]. Citations:
Potential
Add explicit handling for 🤖 Prompt for AI Agents
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From pydantic-ai openai processor: _CHAT_FINISH_REASON_MAP: dict[
Literal['stop', 'length', 'tool_calls', 'content_filter', 'function_call'], FinishReason
] = {
'stop': 'stop',
'length': 'length',
'tool_calls': 'tool_call',
'content_filter': 'content_filter',
'function_call': 'tool_call',
}
_RESPONSES_FINISH_REASON_MAP: dict[Literal['max_output_tokens', 'content_filter'] | ResponseStatus, FinishReason] = {
'max_output_tokens': 'length',
'content_filter': 'content_filter',
'completed': 'stop',
'cancelled': 'error',
'failed': 'error',
}OpenResponses cannot result in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
✏️ Learnings added
|
||
|
|
||
| def get_finish_reason_error( | ||
| finish_reason: AgentFinishReason, | ||
| model_id: str, | ||
| ) -> AbstractErrorResponse: | ||
| """Map a non-success agent finish reason to an LCS error response. | ||
|
|
||
| Args: | ||
| finish_reason: Resolved finish reason from :func:`get_agent_finish_reason`. | ||
| model_id: Model identifier in provider/model format. | ||
|
|
||
| Returns: | ||
| Structured error response for HTTP or SSE error events. | ||
| """ | ||
| match finish_reason: | ||
| case AgentFinishReason.LENGTH: | ||
| return PromptTooLongResponse(model=model_id) | ||
| case AgentFinishReason.CONTENT_FILTER: | ||
| return InternalServerErrorResponse.query_failed( | ||
| "The model refused to generate a response due to content policy." | ||
| ) | ||
| case AgentFinishReason.CANCELLED: | ||
| return InternalServerErrorResponse.query_failed( | ||
| "The response was cancelled before completion." | ||
| ) | ||
| case _: | ||
| return InternalServerErrorResponse.query_failed( | ||
| "An unexpected error occurred while processing the request." | ||
| ) | ||
|
|
||
|
|
||
| def extract_agent_token_usage( | ||
| usage: RunUsage, | ||
| model: str, | ||
| endpoint_path: str, | ||
| ) -> TokenCounter: | ||
| """Build token usage for a completed agent run and record related metrics. | ||
|
|
||
| Args: | ||
| usage: Run usage reported by the agent. | ||
| model: Model identifier in provider/model format. | ||
| endpoint_path: Endpoint path used for metric labeling. | ||
|
|
||
| Returns: | ||
| Aggregated token usage counter for the run. | ||
| """ | ||
| provider_id, model_id = extract_provider_and_model_from_model_id(model) | ||
| token_counter = TokenCounter( | ||
| input_tokens=usage.input_tokens, | ||
| output_tokens=usage.output_tokens, | ||
| llm_calls=max(usage.requests, 1), | ||
| ) | ||
| logger.debug( | ||
| "Extracted token usage from agent run: input=%d, output=%d, requests=%d", | ||
| token_counter.input_tokens, | ||
| token_counter.output_tokens, | ||
| usage.requests, | ||
| ) | ||
| recording.record_llm_token_usage( | ||
| provider_id, | ||
| model_id, | ||
| token_counter.input_tokens, | ||
| token_counter.output_tokens, | ||
| endpoint_path, | ||
| ) | ||
| recording.record_llm_call(provider_id, model_id, endpoint_path) | ||
| return token_counter | ||
|
|
||
|
|
||
| def build_turn_summary_from_agent_run( | ||
| run_result: AgentRunResult[str], | ||
| *, | ||
| model_id: str, | ||
| endpoint_path: str, | ||
| vector_store_ids: list[str], | ||
| rag_id_mapping: dict[str, str], | ||
| ) -> TurnSummary: | ||
| """Build a turn summary from a completed agent run. | ||
|
|
||
| Args: | ||
| run_result: Completed agent run result. | ||
| model_id: Model identifier in provider/model format. | ||
| endpoint_path: Endpoint path used for metric labeling. | ||
| vector_store_ids: Vector store IDs used for source mapping. | ||
| rag_id_mapping: Mapping from vector store IDs to user-facing source labels. | ||
|
|
||
| Returns: | ||
| Turn summary with text, tools, RAG metadata, and token usage. | ||
|
|
||
| Raises: | ||
| HTTPException: When the run failed. | ||
| """ | ||
| finish_reason = get_agent_finish_reason(run_result.response) | ||
| if finish_reason != AgentFinishReason.SUCCESS: | ||
| error_response = get_finish_reason_error(finish_reason, model_id) | ||
| raise HTTPException(**error_response.model_dump()) | ||
|
|
||
| state = AgentTurnAccumulator( | ||
| vector_store_ids=vector_store_ids, | ||
| rag_id_mapping=rag_id_mapping, | ||
| turn_summary=TurnSummary(), | ||
| ) | ||
|
|
||
| for message in run_result.new_messages(): | ||
| if isinstance(message, ModelResponse): | ||
| if message.text: | ||
| state.turn_summary.llm_response = message.text | ||
|
asimurka marked this conversation as resolved.
|
||
| for tool_call_part in message.tool_calls: | ||
| process_function_tool_call(state, tool_call_part) | ||
| for call_part, return_part in message.native_tool_calls: | ||
| process_native_tool_call(state, call_part) | ||
| process_native_tool_result(state, return_part) | ||
| elif isinstance(message, ModelRequest): | ||
| for request_part in message.parts: | ||
| if isinstance(request_part, ToolReturnPart): | ||
| process_function_tool_result(state, request_part) | ||
|
|
||
| state.turn_summary.id = run_result.response.provider_response_id or "" | ||
| state.turn_summary.token_usage = extract_agent_token_usage( | ||
| run_result.usage, | ||
| model_id, | ||
| endpoint_path, | ||
| ) | ||
| return state.turn_summary | ||
|
|
||
|
|
||
| async def retrieve_agent_response( | ||
| client: AsyncLlamaStackClient, | ||
| responses_params: ResponsesApiParams, | ||
| moderation_result: ShieldModerationResult, | ||
| endpoint_path: str, | ||
| ) -> TurnSummary: | ||
| """Retrieve a turn summary from a blocking agent run. | ||
|
|
||
| Mirrors :func:`app.endpoints.query.retrieve_response` for the agent path. | ||
|
|
||
| Args: | ||
| client: Llama Stack client for conversation persistence on moderation block. | ||
| responses_params: Prepared Responses API parameters. | ||
| moderation_result: Shield moderation outcome for the turn. | ||
| endpoint_path: Endpoint path used for metric labeling. | ||
|
|
||
| Returns: | ||
| Turn summary for the completed agent run. | ||
|
|
||
| Raises: | ||
| HTTPException: On moderation is not applicable; on agent or provider failure. | ||
| """ | ||
|
asimurka marked this conversation as resolved.
|
||
| if moderation_result.decision == "blocked": | ||
| await append_turn_items_to_conversation( | ||
| client, | ||
| responses_params.conversation, | ||
| responses_params.input, | ||
| [moderation_result.refusal_response], | ||
| ) | ||
| return TurnSummary( | ||
| id=moderation_result.moderation_id, | ||
| llm_response=moderation_result.message, | ||
| ) | ||
| try: | ||
| agent = build_agent(client, responses_params) | ||
| logger.debug("Starting agent non-streaming response processing") | ||
| run_result = await agent.run(cast(str, responses_params.input)) | ||
|
asimurka marked this conversation as resolved.
|
||
| except (AgentRunError, APIStatusError, APIConnectionError, RuntimeError) as exc: | ||
| response = map_agent_inference_error(exc, responses_params.model) | ||
| raise HTTPException(**response.model_dump()) from exc | ||
|
|
||
| vector_store_ids = extract_vector_store_ids_from_tools(responses_params.tools) | ||
| rag_id_mapping = configuration.rag_id_mapping | ||
| return build_turn_summary_from_agent_run( | ||
| run_result, | ||
| model_id=responses_params.model, | ||
| endpoint_path=endpoint_path, | ||
| vector_store_ids=vector_store_ids, | ||
| rag_id_mapping=rag_id_mapping, | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.