Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 328 additions & 0 deletions src/utils/agents/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
"""Non-streaming agent helpers and shared turn-summary builders for agent runs."""

from __future__ import annotations

from enum import Enum
from typing import TypeAlias, cast

from fastapi import HTTPException
from llama_stack_client import APIConnectionError, APIStatusError, AsyncLlamaStackClient
from pydantic_ai.exceptions import (
AgentRunError,
ContentFilterError,
IncompleteToolCall,
ModelAPIError,
ModelHTTPError,
UnexpectedModelBehavior,
UsageLimitExceeded,
)
from pydantic_ai.messages import ModelRequest, ModelResponse, ToolReturnPart
from pydantic_ai.run import AgentRunResult
from pydantic_ai.usage import RunUsage

from configuration import configuration
from log import get_logger
from metrics import recording
from models.api.responses.error import (
AbstractErrorResponse,
InternalServerErrorResponse,
PromptTooLongResponse,
QuotaExceededResponse,
ServiceUnavailableResponse,
)
from models.common.agents import AgentTurnAccumulator
from models.common.moderation import ShieldModerationResult
from models.common.responses.responses_api_params import ResponsesApiParams
from models.common.turn_summary import TurnSummary
from utils.agents.tool_processor import (
process_function_tool_call,
process_function_tool_result,
process_native_tool_call,
process_native_tool_result,
)
from utils.conversations import append_turn_items_to_conversation
from utils.pydantic_ai import build_agent
from utils.query import (
extract_provider_and_model_from_model_id,
handle_known_apistatus_errors,
is_context_length_error,
)
from utils.responses import extract_vector_store_ids_from_tools
from utils.token_counter import TokenCounter

logger = get_logger(__name__)

AgentInferenceError: TypeAlias = (
AgentRunError | APIStatusError | APIConnectionError | RuntimeError
)


class AgentFinishReason(str, Enum):
"""Finish reason for a completed agent model response."""

CONTENT_FILTER = "content_filter"
CANCELLED = "cancelled"
SUCCESS = "stop"
LENGTH = "length"
ERROR = "error"


def map_agent_inference_error(
exc: AgentInferenceError,
model_id: str,
) -> AbstractErrorResponse:
"""Map agent run failures from pydantic-ai or Llama Stack to an LCS error response.

Args:
exc: Agent, HTTP status, connection, or context-length runtime error.
model_id: Model identifier in provider/model format.

Returns:
Structured error response for HTTP or SSE error events.

Raises:
RuntimeError: Re-raised when ``exc`` is a non-agent ``RuntimeError`` that is
not a recognized context-length failure.
"""
Comment thread
asimurka marked this conversation as resolved.
match exc:
case AgentRunError() as agent_exc:
return map_pydantic_agent_run_error(agent_exc, model_id)
case APIStatusError() as status_exc:
return handle_known_apistatus_errors(status_exc, model_id)
case APIConnectionError() as connection_exc:
return ServiceUnavailableResponse(
backend_name="Llama Stack",
cause=str(connection_exc),
)
case RuntimeError() as runtime_exc if is_context_length_error(str(runtime_exc)):
return PromptTooLongResponse(model=model_id)
case _:
return InternalServerErrorResponse.generic()


def map_pydantic_agent_run_error(
exc: AgentRunError, model_id: str
) -> AbstractErrorResponse:
"""Map pydantic-ai ``AgentRunError`` subclasses to LCS error responses.

Args:
exc: Agent exception to map.
model_id: Model identifier in provider/model format.

Returns:
Structured error response for HTTP or SSE error events.
"""
match exc:
case ContentFilterError() as filter_exc:
return InternalServerErrorResponse.query_failed(str(filter_exc))
case IncompleteToolCall() | UnexpectedModelBehavior():
return PromptTooLongResponse(model=model_id)
case UsageLimitExceeded():
return QuotaExceededResponse.model(model_id)
case ModelHTTPError() as http_exc if is_context_length_error(str(http_exc)):
return PromptTooLongResponse(model=model_id)
case ModelHTTPError(status_code=429):
return QuotaExceededResponse.model(model_id)
case ModelHTTPError():
return InternalServerErrorResponse.generic()
case ModelAPIError() as api_exc:
return ServiceUnavailableResponse(
backend_name="Llama Stack",
cause=str(api_exc),
)
case _:
return InternalServerErrorResponse.query_failed(str(exc))


def get_agent_finish_reason(response: ModelResponse) -> AgentFinishReason:
"""Get the finish reason from a completed agent model response.

Args:
response: Last model response from the agent run.

Returns:
Resolved finish reason.
"""
raw_finish_reason = (response.provider_details or {}).get("finish_reason")
if raw_finish_reason == "cancelled":
return AgentFinishReason.CANCELLED
if response.finish_reason is None:
return AgentFinishReason.ERROR
return AgentFinishReason(response.finish_reason)

Comment on lines +149 to +152

@coderabbitai coderabbitai Bot Jun 9, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check pydantic-ai FinishReason definition to confirm possible values
rg -n "FinishReason" --type py -A 5

Repository: lightspeed-core/lightspeed-stack

Length of output: 7310


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show the relevant enum + get_agent_finish_reason implementation
sed -n '40,180p' src/utils/agents/query.py

# Show the unit tests for get_agent_finish_reason
sed -n '120,220p' tests/unit/utils/agents/test_query.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 8661


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate imports/definitions around ModelResponse / FinishReason usage
rg -n "from .* import .*ModelResponse|ModelResponse" src/utils/agents/query.py src/utils -S

# Find how finish_reason is typed/used elsewhere
rg -n "\"tool_calls\"|\"tool_output\"|finish_reason\s*:" src -S

# Show utils.pydantic_ai module (likely where ModelResponse/FinishReason are re-exported)
fd -a -t f "pydantic_ai.py" src/utils -x echo
fd -a -t f "pydantic_ai*" src/utils -x echo

Repository: lightspeed-core/lightspeed-stack

Length of output: 1254


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect utils/pydantic_ai.py for re-exports/types that define ModelResponse / FinishReason
sed -n '1,220p' src/utils/pydantic_ai.py

# Find any references to "finish_reason" across the repo
rg -n "finish_reason" src -S

# Inspect pydantic-ai ModelResponse type hints only if vendored/aliased in this repo;
# otherwise, we can at least see whether code ever expects tool_calls/tool_output.
rg -n "\"tool_calls\"|\"tool_output\"" src -S

Repository: lightspeed-core/lightspeed-stack

Length of output: 5568


🏁 Script executed:

#!/bin/bash
set -euo pipefail

python3 - <<'PY'
import inspect
import typing
try:
    import pydantic_ai
    from pydantic_ai.messages import ModelResponse
except Exception as e:
    print("Failed to import pydantic_ai / ModelResponse:", e)
    raise

print("pydantic_ai version:", getattr(pydantic_ai, "__version__", "unknown"))
print("ModelResponse:", ModelResponse)

# Print annotated fields for ModelResponse
ann = getattr(ModelResponse, "__annotations__", None)
print("ModelResponse.__annotations__ keys:", sorted(list(ann.keys())) if ann else None)

# Try to find finish_reason field annotation
finish_ann = None
if ann and "finish_reason" in ann:
    finish_ann = ann["finish_reason"]
print("finish_reason annotation:", finish_ann)

# Try to resolve/inspect any enums/literals involved in finish_reason
try:
    from pydantic_ai.messages import FinishReason
    print("FinishReason:", FinishReason)
    print("FinishReason members/values:")
    if hasattr(FinishReason, "__members__"):
        print(list(FinishReason.__members__.items())[:50])
    else:
        print("Non-enum type:", FinishReason)
except Exception as e:
    print("Could not import FinishReason from pydantic_ai.messages:", e)

# If finish_reason is a Literal, show its args
if finish_ann is not None:
    origin = typing.get_origin(finish_ann)
    args = typing.get_args(finish_ann)
    print("finish_reason origin:", origin)
    print("finish_reason args (first 30):", list(args)[:30])
PY

Repository: lightspeed-core/lightspeed-stack

Length of output: 294


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find dependency pins for pydantic-ai / pydantic_ai
ls -la
fd -a -t f "pyproject.toml|requirements*.txt|uv.lock|poetry.lock|Pipfile.lock" . 2>/dev/null || true

rg -n "pydantic[-_]ai|pydantic_ai" -S pyproject.toml requirements*.txt poetry.lock uv.lock Pipfile.lock 2>/dev/null || true

# Also show the relevant dependency section if pyproject exists
if [ -f pyproject.toml ]; then
  sed -n '1,200p' pyproject.toml
fi

Repository: lightspeed-core/lightspeed-stack

Length of output: 9409


🌐 Web query:

pydantic-ai 1.105.0 FinishReason tool_calls tool_output

💡 Result:

In Pydantic AI, FinishReason, tool_calls, and tool_output are core concepts related to how an agent manages model responses and tool execution. FinishReason The FinishReason is an enumeration that describes why a model stopped generating a response, normalized to OpenTelemetry values [1]. It includes the following possible values: 'stop', 'length', 'content_filter', 'tool_call', and 'error' [1]. Developers can use the finish_reason attribute on a ModelResponse to inspect why a model ended its generation, for instance, to implement fallback logic if a response was truncated due to length limits or a content filter [2]. Tool Calls and Tool Output Pydantic AI distinguishes between function tools (which perform actions) and output tools (which produce the final result of a run) [3][4]. 1. Tool Calls: When a model decides to use a tool, it emits a tool call. If the model emits a tool call, the finish_reason for that response is typically 'tool_call' [1][5]. 2. Tool Output: This is the data returned after executing a tool. In the default "Tool Output" mode, the JSON schema for output types is provided to the model as a special output tool [6][7]. 3. Managing Execution: Recent updates, such as those in PR #5339 (which influenced behavior leading up to v1.105.0), refined how these are handled [8][9]. Specifically: - The end_strategy parameter (e.g., 'graceful', 'exhaustive') controls the execution flow when a model emits both function tools and an output tool in the same response [4][7]. - 'Graceful' ensures function tools are executed even if a final result is found, while 'exhaustive' ensures all tools—including output tools—are executed [4]. - ToolOutput can be used as a marker class to customize output tools [6][7]. Pydantic AI v1.105.0 was released on June 2, 2026, and focused on features like on-demand loading and Grok model support [8]. The concepts of FinishReason and tool management remain central to the framework's operational flow [1][6].

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail
rg -n "get_agent_finish_reason\\(|finish_reason=.*tool" tests src -S

Repository: lightspeed-core/lightspeed-stack

Length of output: 882


🏁 Script executed:

#!/bin/bash
set -euo pipefail
sed -n '200,280p' src/utils/agents/query.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 2918


🏁 Script executed:

#!/bin/bash
set -euo pipefail
rg -n "\"tool_call\"|\"tool_calls\"|\"tool_output\"|tool_output|tool_calls" src tests -S

Repository: lightspeed-core/lightspeed-stack

Length of output: 28367


🌐 Web query:

pydantic-ai FinishReason enum values stop length content_filter tool_call error ModelResponse.finish_reason

💡 Result:

In Pydantic AI, the FinishReason type is a TypeAlias representing the reason a model stopped generating a response [1]. It is defined as a Literal string type with the following valid values [1]: stop: The model reached a natural stop point or a provided stop sequence [1][2]. length: The maximum number of tokens specified in the request was reached [1][2]. content_filter: The generation was stopped due to a content filter flag [1][2]. tool_call: The model initiated a tool call [1]. error: The generation ended due to an error (e.g., failure or cancellation) [1][3][4]. The ModelResponse class includes a finish_reason attribute (of type FinishReason | None) that records this information [1]. This field is intended to normalize diverse provider-specific finish reasons into a standardized format aligned with OpenTelemetry conventions [1][5]. Individual model providers (such as OpenAI, Anthropic, and xAI) implement internal mapping logic to convert their native API response finish reasons into these standardized Pydantic AI FinishReason values [3][6][7][8].

Citations:


Potential ValueError when finish_reason is tool_call (or otherwise unknown)

get_agent_finish_reason() constructs AgentFinishReason(response.finish_reason) without a guard. pydantic-ai’s FinishReason includes "tool_call", which is not a member of the local AgentFinishReason enum, so this will raise ValueError instead of mapping to an LCS error.

Add explicit handling for tool_call (or catch ValueError and map unknown finish reasons to a safe fallback like AgentFinishReason.ERROR) rather than directly converting to the enum.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/utils/agents/query.py` around lines 149 - 152, In
get_agent_finish_reason, avoid directly constructing
AgentFinishReason(response.finish_reason) because response.finish_reason can be
"tool_call" (or other unknown values) which will raise ValueError; update the
function to either explicitly check for "tool_call" on response.finish_reason
and return AgentFinishReason.ERROR, or wrap the enum conversion in a try/except
ValueError that returns AgentFinishReason.ERROR as a safe fallback, referencing
response.finish_reason and AgentFinishReason to locate the change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From pydantic-ai openai processor:

_CHAT_FINISH_REASON_MAP: dict[
    Literal['stop', 'length', 'tool_calls', 'content_filter', 'function_call'], FinishReason
] = {
    'stop': 'stop',
    'length': 'length',
    'tool_calls': 'tool_call',
    'content_filter': 'content_filter',
    'function_call': 'tool_call',
}

_RESPONSES_FINISH_REASON_MAP: dict[Literal['max_output_tokens', 'content_filter'] | ResponseStatus, FinishReason] = {
    'max_output_tokens': 'length',
    'content_filter': 'content_filter',
    'completed': 'stop',
    'cancelled': 'error',
    'failed': 'error',
}

OpenResponses cannot result in tool_call finish reason.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asimurka, thanks for clarifying — you're right. Since this code targets the OpenAI Responses API (not Chat Completions), _RESPONSES_FINISH_REASON_MAP applies, and "tool_call" is not a possible finish reason from that path. All values it can emit — stop, length, content_filter, error — are covered by AgentFinishReason. My concern doesn't apply here.


✏️ Learnings added
Learnt from: asimurka
Repo: lightspeed-core/lightspeed-stack PR: 1880
File: src/utils/agents/query.py:149-152
Timestamp: 2026-06-09T07:36:38.363Z
Learning: In `src/utils/agents/query.py` (lightspeed-core/lightspeed-stack), `get_agent_finish_reason` and `AgentFinishReason` enum are designed specifically for the pydantic-ai OpenAI **Responses API** (not Chat Completions). The Responses API `_RESPONSES_FINISH_REASON_MAP` only produces `stop`, `length`, `content_filter`, and `error` as finish reasons — `tool_call` cannot occur from this API path. Therefore, a `ValueError` from `AgentFinishReason(response.finish_reason)` is not a concern in this context.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


def get_finish_reason_error(
finish_reason: AgentFinishReason,
model_id: str,
) -> AbstractErrorResponse:
"""Map a non-success agent finish reason to an LCS error response.

Args:
finish_reason: Resolved finish reason from :func:`get_agent_finish_reason`.
model_id: Model identifier in provider/model format.

Returns:
Structured error response for HTTP or SSE error events.
"""
match finish_reason:
case AgentFinishReason.LENGTH:
return PromptTooLongResponse(model=model_id)
case AgentFinishReason.CONTENT_FILTER:
return InternalServerErrorResponse.query_failed(
"The model refused to generate a response due to content policy."
)
case AgentFinishReason.CANCELLED:
return InternalServerErrorResponse.query_failed(
"The response was cancelled before completion."
)
case _:
return InternalServerErrorResponse.query_failed(
"An unexpected error occurred while processing the request."
)


def extract_agent_token_usage(
usage: RunUsage,
model: str,
endpoint_path: str,
) -> TokenCounter:
"""Build token usage for a completed agent run and record related metrics.

Args:
usage: Run usage reported by the agent.
model: Model identifier in provider/model format.
endpoint_path: Endpoint path used for metric labeling.

Returns:
Aggregated token usage counter for the run.
"""
provider_id, model_id = extract_provider_and_model_from_model_id(model)
token_counter = TokenCounter(
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
llm_calls=max(usage.requests, 1),
)
logger.debug(
"Extracted token usage from agent run: input=%d, output=%d, requests=%d",
token_counter.input_tokens,
token_counter.output_tokens,
usage.requests,
)
recording.record_llm_token_usage(
provider_id,
model_id,
token_counter.input_tokens,
token_counter.output_tokens,
endpoint_path,
)
recording.record_llm_call(provider_id, model_id, endpoint_path)
return token_counter


def build_turn_summary_from_agent_run(
run_result: AgentRunResult[str],
*,
model_id: str,
endpoint_path: str,
vector_store_ids: list[str],
rag_id_mapping: dict[str, str],
) -> TurnSummary:
"""Build a turn summary from a completed agent run.

Args:
run_result: Completed agent run result.
model_id: Model identifier in provider/model format.
endpoint_path: Endpoint path used for metric labeling.
vector_store_ids: Vector store IDs used for source mapping.
rag_id_mapping: Mapping from vector store IDs to user-facing source labels.

Returns:
Turn summary with text, tools, RAG metadata, and token usage.

Raises:
HTTPException: When the run failed.
"""
finish_reason = get_agent_finish_reason(run_result.response)
if finish_reason != AgentFinishReason.SUCCESS:
error_response = get_finish_reason_error(finish_reason, model_id)
raise HTTPException(**error_response.model_dump())

state = AgentTurnAccumulator(
vector_store_ids=vector_store_ids,
rag_id_mapping=rag_id_mapping,
turn_summary=TurnSummary(),
)

for message in run_result.new_messages():
if isinstance(message, ModelResponse):
if message.text:
state.turn_summary.llm_response = message.text
Comment thread
asimurka marked this conversation as resolved.
for tool_call_part in message.tool_calls:
process_function_tool_call(state, tool_call_part)
for call_part, return_part in message.native_tool_calls:
process_native_tool_call(state, call_part)
process_native_tool_result(state, return_part)
elif isinstance(message, ModelRequest):
for request_part in message.parts:
if isinstance(request_part, ToolReturnPart):
process_function_tool_result(state, request_part)

state.turn_summary.id = run_result.response.provider_response_id or ""
state.turn_summary.token_usage = extract_agent_token_usage(
run_result.usage,
model_id,
endpoint_path,
)
return state.turn_summary


async def retrieve_agent_response(
client: AsyncLlamaStackClient,
responses_params: ResponsesApiParams,
moderation_result: ShieldModerationResult,
endpoint_path: str,
) -> TurnSummary:
"""Retrieve a turn summary from a blocking agent run.

Mirrors :func:`app.endpoints.query.retrieve_response` for the agent path.

Args:
client: Llama Stack client for conversation persistence on moderation block.
responses_params: Prepared Responses API parameters.
moderation_result: Shield moderation outcome for the turn.
endpoint_path: Endpoint path used for metric labeling.

Returns:
Turn summary for the completed agent run.

Raises:
HTTPException: On moderation is not applicable; on agent or provider failure.
"""
Comment thread
asimurka marked this conversation as resolved.
if moderation_result.decision == "blocked":
await append_turn_items_to_conversation(
client,
responses_params.conversation,
responses_params.input,
[moderation_result.refusal_response],
)
return TurnSummary(
id=moderation_result.moderation_id,
llm_response=moderation_result.message,
)
try:
agent = build_agent(client, responses_params)
logger.debug("Starting agent non-streaming response processing")
run_result = await agent.run(cast(str, responses_params.input))
Comment thread
asimurka marked this conversation as resolved.
except (AgentRunError, APIStatusError, APIConnectionError, RuntimeError) as exc:
response = map_agent_inference_error(exc, responses_params.model)
raise HTTPException(**response.model_dump()) from exc

vector_store_ids = extract_vector_store_ids_from_tools(responses_params.tools)
rag_id_mapping = configuration.rag_id_mapping
return build_turn_summary_from_agent_run(
run_result,
model_id=responses_params.model,
endpoint_path=endpoint_path,
vector_store_ids=vector_store_ids,
rag_id_mapping=rag_id_mapping,
)
Loading
Loading