Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,32 @@ pipeline.run(data={"retriever": {"query": query}, "agent": {"messages": [], "que
If the prompt itself must still be assembled per run, build `ChatMessage` objects before the `Agent` (e.g. with a `ChatPromptBuilder`) and pass them through the `messages` input.
For a runtime system prompt, construct an `Agent` without `system_prompt` or `user_prompt` and include a system message at the start of `messages`.

#### Reserved `state_schema` keys for built-in run metadata

**What changed:** `Agent` now auto-populates three new outputs — `step_count`, `token_usage`, and `tool_call_counts` — and reserves those names in its `state_schema`. Passing any of them as a `state_schema` key now raises `ValueError`.

**Why:** These keys are managed by `Agent` itself and exposed as outputs only; allowing users to redefine them would let an input shadow the value the Agent is trying to write.

**How to migrate:** Rename any clashing `state_schema` entries.

Before (v2.x):
```python
agent = Agent(
chat_generator=...,
tools=[...],
state_schema={"token_usage": {"type": dict}},
)
```

After (v3.0):
```python
agent = Agent(
chat_generator=...,
tools=[...],
state_schema={"my_token_usage": {"type": dict}},
)
```

### LLM

#### Runtime `user_prompt` and `system_prompt` removed from `LLM.run` / `LLM.run_async`
Expand Down
133 changes: 118 additions & 15 deletions haystack/components/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import contextvars
import inspect
import re
from copy import deepcopy
from dataclasses import dataclass
from typing import Any, Literal, cast

Expand Down Expand Up @@ -48,6 +49,77 @@
# Regex to extract the role from a Jinja2 message block, e.g. {% message role="user" %}
_JINJA2_MESSAGE_ROLE_RE = re.compile(r'\{%\s*message\s+role\s*=\s*["\'](\w+)["\']')

# State keys that the Agent populates automatically during a run.
# Users may not define them in their own `state_schema`, and they are exposed only as Agent outputs.
_INTERNAL_STATE_KEYS: dict[str, dict[str, Any]] = {
"step_count": {"type": int, "handler": replace_values},
"token_usage": {"type": dict[str, Any], "handler": replace_values},
"tool_call_counts": {"type": dict[str, int], "handler": replace_values},
}


def _accumulate_usage(current: Any, new: Any) -> Any:
"""
Recursively sum numeric leaf values across two usage-like dicts.

Used to aggregate `ChatMessage.meta["usage"]` payloads across LLM calls in a run. Nested dicts (e.g. OpenAI's
`completion_tokens_details`) are merged recursively; numeric leaves are summed; other types fall back to the new
value.

:param current: The current accumulated usage data.
:param new: The new usage data to merge in.
"""
if isinstance(current, dict) and isinstance(new, dict):
result = dict(current)
for k, v in new.items():
result[k] = _accumulate_usage(result[k], v) if k in result else deepcopy(v)
return result
if isinstance(current, (int, float)) and isinstance(new, (int, float)):
return current + new
return new


def _record_llm_usage(state: State, llm_messages: list[ChatMessage]) -> None:
"""
Aggregate token usage from the latest LLM messages into the State.

Only writes when at least one message reports `meta["usage"]`, so generators that don't surface usage data
leave `token_usage` at its default empty dict rather than overwriting it.

:param state: The Agent's State, used to read the running `token_usage` total and write back the new total.
:param llm_messages: The ChatMessage objects returned from the latest LLM call. Token usage is read from each
message's `meta["usage"]` field, if present.
"""
current = state.get("token_usage")
updated = False
for msg in llm_messages:
usage = msg.meta.get("usage")
if isinstance(usage, dict):
current = _accumulate_usage(current or {}, usage)
updated = True
if updated:
state.set("token_usage", current)


def _record_tool_calls(state: State, tool_messages: list[ChatMessage]) -> None:
"""
Increment per-tool call counts in the State for every successfully dispatched tool.

:param state: The Agent's State, used to read the running `tool_call_counts` map and write back the new totals.
:param tool_messages: The ChatMessage objects returned from the latest tool execution. Per-tool counts are
incremented based on each message's `tool_call_result.origin.tool_name`.
"""
counts = state.get("tool_call_counts") or {}
updated = False
for tm in tool_messages:
if tm.tool_call_result is None:
continue
name = tm.tool_call_result.origin.tool_name
counts[name] = counts.get(name, 0) + 1
updated = True
if updated:
state.set("tool_call_counts", counts)


def _get_run_method_params(instance: "Agent") -> set[str]:
"""Derive the parameter names of the Agent.run method via introspection."""
Expand Down Expand Up @@ -292,7 +364,8 @@ def __init__(
with `"type"` (required) and an optional `"handler"` for merging values across tool calls.
Tools can read from and write to state keys using `inputs_from_state` and `outputs_to_state`.
:param max_agent_steps: Maximum number of steps the agent will run before stopping. Defaults to 100.
If the agent exceeds this number of steps, it will stop and return the current state.
A step is one chat-generator call plus the execution of every tool call the model requested in
that call (if any). If the agent reaches this number of steps it stops and returns the current state.
:param streaming_callback: A callback that will be invoked when a response is streamed from the LLM.
The same callback can be configured to emit tool results when a tool is called.
:param raise_on_tool_invocation_failure: Should the agent raise an exception when a tool invocation fails?
Expand Down Expand Up @@ -324,6 +397,12 @@ def __init__(
)

if state_schema is not None:
reserved_used = sorted(set(state_schema) & _INTERNAL_STATE_KEYS.keys())
if reserved_used:
raise ValueError(
f"state_schema keys {reserved_used} are reserved for Agent internal state and "
f"cannot be redefined. Reserved keys: {sorted(_INTERNAL_STATE_KEYS)}."
)
_validate_schema(state_schema)
_validate_prompt_message_blocks(user_prompt, system_prompt)
if tool_concurrency_limit < 1:
Expand All @@ -350,13 +429,16 @@ def __init__(
self.state_schema = dict(self._state_schema)
if self.state_schema.get("messages") is None:
self.state_schema["messages"] = {"type": list[ChatMessage], "handler": merge_lists}
for key, config in _INTERNAL_STATE_KEYS.items():
self.state_schema[key] = dict(config)

# --- Component I/O ---
self._run_method_params = _get_run_method_params(self)
output_types = {"last_message": ChatMessage}
output_types: dict[str, Any] = {"last_message": ChatMessage}
for param, config in self.state_schema.items():
output_types[param] = config["type"]
if param not in self._run_method_params:
# Internal state keys are populated internally by the Agent itself and are not exposed as inputs
if param not in self._run_method_params and param not in _INTERNAL_STATE_KEYS:
component.set_input_type(self, name=param, type=config["type"], default=None)
component.set_output_types(self, **output_types)

Expand Down Expand Up @@ -569,15 +651,18 @@ def _initialize_fresh_execution(
if all(m.is_from(ChatRole.SYSTEM) for m in messages):
logger.warning("All messages provided to the Agent component are system messages. This is not recommended.")

selected_tools = self._select_tools(tools)

state_kwargs: dict[str, Any] = {key: kwargs[key] for key in self.state_schema.keys() if key in kwargs}
state = State(schema=self.state_schema, data=state_kwargs)
state.set("messages", messages)
state.set("step_count", 0)
state.set("token_usage", {})
state.set("tool_call_counts", {tool.name: 0 for tool in flatten_tools_or_toolsets(selected_tools)})

streaming_callback = select_streaming_callback( # type: ignore[call-overload]
init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=requires_async
)

selected_tools = self._select_tools(tools)
generator_inputs: dict[str, Any] = {}
if self._chat_generator_supports_tools:
generator_inputs["tools"] = selected_tools
Expand Down Expand Up @@ -669,6 +754,12 @@ def run(
A dictionary with the following keys:
- "messages": List of all messages exchanged during the agent's run.
- "last_message": The last message exchanged during the agent's run.
- "step_count": The number of steps the agent ran. A step is one chat-generator call plus the
execution of every tool call the model requested in that call (if any). The counter is incremented
after each step completes, including the final step that hits an exit condition or `max_agent_steps`.
- "token_usage": Aggregated token usage from every LLM call in the run, summed from each LLM message's
`meta["usage"]`.
- "tool_call_counts": Mapping of tool name to the number of times that tool was invoked.
- Any additional keys defined in the `state_schema`.
"""
agent_inputs = {"messages": messages, "streaming_callback": streaming_callback, **kwargs}
Expand Down Expand Up @@ -738,6 +829,12 @@ async def run_async(
A dictionary with the following keys:
- "messages": List of all messages exchanged during the agent's run.
- "last_message": The last message exchanged during the agent's run.
- "step_count": The number of steps the agent ran. A step is one chat-generator call plus the
execution of every tool call the model requested in that call (if any). The counter is incremented
after each step completes, including the final step that hits an exit condition or `max_agent_steps`.
- "token_usage": Aggregated token usage from every LLM call in the run, summed from each LLM message's
`meta["usage"]`.
- "tool_call_counts": Mapping of tool name to the number of times that tool was invoked.
- Any additional keys defined in the `state_schema`.
"""
agent_inputs = {"messages": messages, "streaming_callback": streaming_callback, **kwargs}
Expand Down Expand Up @@ -787,9 +884,11 @@ def _run_step(self, exe_context: _ExecutionContext, agent_span: tracing.Span) ->
llm_span.set_content_tag("haystack.agent.step.llm.output", result)
llm_messages = result["replies"]
exe_context.state.set("messages", llm_messages)
_record_llm_usage(exe_context.state, llm_messages)

if not any(msg.tool_call for msg in llm_messages) or not self.tools:
exe_context.counter += 1
exe_context.state.set("step_count", exe_context.counter)
return False

modified_tool_call_messages, new_chat_history = _process_confirmation_strategies(
Expand All @@ -815,13 +914,14 @@ def _run_step(self, exe_context: _ExecutionContext, agent_span: tracing.Span) ->
"haystack.agent.step.tool.output", {"tool_messages": tool_messages, "state": exe_context.state}
)
exe_context.state.set("messages", tool_messages)

if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
exe_context.counter += 1
return False
_record_tool_calls(exe_context.state, tool_messages)

exe_context.counter += 1
return True
exe_context.state.set("step_count", exe_context.counter)
exit_triggered = self.exit_conditions != ["text"] and self._check_exit_conditions(
llm_messages, tool_messages
)
return not exit_triggered

async def _run_step_async(self, exe_context: _ExecutionContext, agent_span: tracing.Span) -> bool:
"""Execute one agent step asynchronously. Returns True to continue the loop, False to stop."""
Expand All @@ -848,9 +948,11 @@ async def _run_step_async(self, exe_context: _ExecutionContext, agent_span: trac
llm_span.set_content_tag("haystack.agent.step.llm.output", result)
llm_messages = result["replies"]
exe_context.state.set("messages", llm_messages)
_record_llm_usage(exe_context.state, llm_messages)

if not any(msg.tool_call for msg in llm_messages) or not self.tools:
exe_context.counter += 1
exe_context.state.set("step_count", exe_context.counter)
return False

modified_tool_call_messages, new_chat_history = await _process_confirmation_strategies_async(
Expand All @@ -876,13 +978,14 @@ async def _run_step_async(self, exe_context: _ExecutionContext, agent_span: trac
"haystack.agent.step.tool.output", {"tool_messages": tool_messages, "state": exe_context.state}
)
exe_context.state.set("messages", tool_messages)

if self.exit_conditions != ["text"] and self._check_exit_conditions(llm_messages, tool_messages):
exe_context.counter += 1
return False
_record_tool_calls(exe_context.state, tool_messages)

exe_context.counter += 1
return True
exe_context.state.set("step_count", exe_context.counter)
exit_triggered = self.exit_conditions != ["text"] and self._check_exit_conditions(
llm_messages, tool_messages
)
return not exit_triggered

def _check_exit_conditions(self, llm_messages: list[ChatMessage], tool_messages: list[ChatMessage]) -> bool:
"""
Expand Down
23 changes: 21 additions & 2 deletions haystack/components/generators/chat/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ def __init__(
)
component.set_input_type(self, "messages", list[ChatMessage], None)

# The Agent base class declares `step_count` and `tool_call_counts` as outputs, but an LLM never has tools
# and always runs exactly one step — those values are uninformative, so drop them from the public surface.
# `token_usage` is still meaningful and stays exposed.
component.set_output_types(
self, messages=list[ChatMessage], last_message=ChatMessage, token_usage=dict[str, Any]
)

def to_dict(self) -> dict[str, Any]:
"""
Serialize the LLM component to a dictionary.
Expand Down Expand Up @@ -140,16 +147,22 @@ def run( # type: ignore[override] # `messages` is in **kwargs to allow dynamic
A dictionary with the following keys:
- "messages": List of all messages exchanged during the LLM's run.
- "last_message": The last message exchanged during the LLM's run.
- "token_usage": Token usage from the LLM call (e.g. prompt_tokens, completion_tokens). Empty if the
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding this to the release note under enhancements? I suggest we do that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 22901c9

chat generator did not return usage data.
"""
# `messages` is intentionally omitted from the signature so the framework can treat it as required
# or optional depending on init configuration. See __init__ for details.
messages = kwargs.pop("messages", None)
return super(LLM, self).run( # noqa: UP008
result = super(LLM, self).run( # noqa: UP008
messages=messages or [],
streaming_callback=streaming_callback,
generation_kwargs=generation_kwargs,
**kwargs,
)
# Inherited Agent-internal bookkeeping that isn't useful at the LLM surface.
result.pop("step_count", None)
result.pop("tool_call_counts", None)
return result

async def run_async( # type: ignore[override] # `messages` is in **kwargs to allow dynamic required/optional status
self,
Expand All @@ -174,13 +187,19 @@ async def run_async( # type: ignore[override] # `messages` is in **kwargs to a
A dictionary with the following keys:
- "messages": List of all messages exchanged during the LLM's run.
- "last_message": The last message exchanged during the LLM's run.
- "token_usage": Token usage from the LLM call (e.g. prompt_tokens, completion_tokens). Empty if the
chat generator did not return usage data.
"""
# `messages` is intentionally omitted from the signature so the framework can treat it as required
# or optional depending on init configuration. See __init__ for details.
messages = kwargs.pop("messages", None)
return await super(LLM, self).run_async( # noqa: UP008
result = await super(LLM, self).run_async( # noqa: UP008
messages=messages or [],
streaming_callback=streaming_callback,
generation_kwargs=generation_kwargs,
**kwargs,
)
# Inherited Agent-internal bookkeeping that isn't useful at the LLM surface.
result.pop("step_count", None)
Copy link
Copy Markdown
Member

@julian-risch julian-risch May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add another key to the Agent's _INTERNAL_STATE_KEYS, we need to remember to also pop it here. Just a comment. No change request.

result.pop("tool_call_counts", None)
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
enhancements:
- |
``Agent`` now exposes three new outputs that are populated automatically during a
run and made available alongside ``messages`` and ``last_message`` in the result dict:

- ``step_count`` (``int``): the number of steps the agent ran.
- ``token_usage`` (``dict[str, Any]``): aggregated token usage summed across every LLM call in the run
- ``tool_call_counts`` (``dict[str, int]``): number of times each tool was invoked, keyed by tool name.

These fields are added to ``Agent.state_schema`` automatically so that tools registered via ``inputs_from_state`` can read them mid-run.
They are exposed only as Agent outputs so cannot be passed in as inputs to ``Agent.run`` / ``Agent.run_async``.
- |
``LLM`` now exposes a ``token_usage`` output alongside ``messages`` and ``last_message``. Because ``LLM`` never
invokes tools and always runs exactly one step, ``step_count`` and ``tool_call_counts`` inherited from ``Agent``
are not exposed on ``LLM``.
upgrade:
- |
``step_count``, ``token_usage``, and ``tool_call_counts`` are now reserved keys in ``Agent.state_schema``.
Passing any of them via the ``state_schema`` argument now raises ``ValueError``.
Rename the conflicting state key (e.g. ``my_token_usage``) to migrate.
Loading
Loading